Merge pull request #67308 from cofyc/fix67260

Use monotonically increasing generation to prevent equivalence cache race
This commit is contained in:
k8s-ci-robot 2018-09-25 00:18:00 -07:00 committed by GitHub
commit 28d86ac47d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 310 additions and 172 deletions

View File

@ -50,7 +50,6 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
], ],
) )

View File

@ -555,13 +555,6 @@ func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDi
return pdbs, nil return pdbs, nil
} }
func (cache *schedulerCache) IsUpToDate(n *NodeInfo) bool {
cache.mu.RLock()
defer cache.mu.RUnlock()
node, ok := cache.nodes[n.Node().Name]
return ok && n.generation == node.generation
}
func (cache *schedulerCache) run() { func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
} }

View File

@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
@ -1336,26 +1335,3 @@ func TestPDBOperations(t *testing.T) {
} }
} }
} }
func TestIsUpToDate(t *testing.T) {
cache := New(time.Duration(0), wait.NeverStop)
if err := cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}); err != nil {
t.Errorf("Could not add node: %v", err)
}
s := cache.Snapshot()
node := s.Nodes["n1"]
if !cache.IsUpToDate(node) {
t.Errorf("Node incorrectly marked as stale")
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1", UID: "p1"}, Spec: v1.PodSpec{NodeName: "n1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod: %v", err)
}
if cache.IsUpToDate(node) {
t.Errorf("Node incorrectly marked as up to date")
}
badNode := &NodeInfo{node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n2"}}}
if cache.IsUpToDate(badNode) {
t.Errorf("Nonexistant node incorrectly marked as up to date")
}
}

View File

@ -123,9 +123,6 @@ type Cache interface {
// Snapshot takes a snapshot on current cache // Snapshot takes a snapshot on current cache
Snapshot() *Snapshot Snapshot() *Snapshot
// IsUpToDate returns true if the given NodeInfo matches the current data in the cache.
IsUpToDate(n *NodeInfo) bool
// NodeTree returns a node tree structure // NodeTree returns a node tree structure
NodeTree() *NodeTree NodeTree() *NodeTree
} }

View File

@ -27,7 +27,6 @@ go_test(
"//pkg/scheduler/algorithm:go_default_library", "//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library", "//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -35,6 +35,9 @@ import (
hashutil "k8s.io/kubernetes/pkg/util/hash" hashutil "k8s.io/kubernetes/pkg/util/hash"
) )
// nodeMap stores a *NodeCache for each node.
type nodeMap map[string]*NodeCache
// Cache is a thread safe map saves and reuses the output of predicate functions, // Cache is a thread safe map saves and reuses the output of predicate functions,
// it uses node name as key to access those cached results. // it uses node name as key to access those cached results.
// //
@ -42,13 +45,23 @@ import (
// class". (Equivalence class is defined in the `Class` type.) Saved results // class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called. // will be reused until an appropriate invalidation function is called.
type Cache struct { type Cache struct {
// i.e. map[string]*NodeCache // NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while
sync.Map // the reality is lock contention in first level cache is rare.
mu sync.RWMutex
nodeToCache nodeMap
predicateIDMap map[string]int
} }
// NewCache create an empty equiv class cache. // NewCache create an empty equiv class cache.
func NewCache() *Cache { func NewCache(predicates []string) *Cache {
return new(Cache) predicateIDMap := make(map[string]int, len(predicates))
for id, predicate := range predicates {
predicateIDMap[predicate] = id
}
return &Cache{
nodeToCache: make(nodeMap),
predicateIDMap: predicateIDMap,
}
} }
// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to // NodeCache saves and reuses the output of predicate functions. Use RunPredicate to
@ -63,34 +76,90 @@ func NewCache() *Cache {
type NodeCache struct { type NodeCache struct {
mu sync.RWMutex mu sync.RWMutex
cache predicateMap cache predicateMap
// generation is current generation of node cache, incremented on node
// invalidation.
generation uint64
// snapshotGeneration saves snapshot of generation of node cache.
snapshotGeneration uint64
// predicateGenerations stores generation numbers for predicates, incremented on
// predicate invalidation. Created on first update. Use 0 if does not
// exist.
predicateGenerations []uint64
// snapshotPredicateGenerations saves snapshot of generation numbers for predicates.
snapshotPredicateGenerations []uint64
} }
// newNodeCache returns an empty NodeCache. // newNodeCache returns an empty NodeCache.
func newNodeCache() *NodeCache { func newNodeCache(n int) *NodeCache {
return &NodeCache{ return &NodeCache{
cache: make(predicateMap), cache: make(predicateMap, n),
predicateGenerations: make([]uint64, n),
snapshotPredicateGenerations: make([]uint64, n),
} }
} }
// Snapshot snapshots current generations of cache.
// NOTE: We snapshot generations of all node caches before using it and these
// operations are serialized, we can save snapshot as member of node cache
// itself.
func (c *Cache) Snapshot() {
c.mu.RLock()
defer c.mu.RUnlock()
for _, n := range c.nodeToCache {
n.mu.Lock()
// snapshot predicate generations
copy(n.snapshotPredicateGenerations, n.predicateGenerations)
// snapshot node generation
n.snapshotGeneration = n.generation
n.mu.Unlock()
}
return
}
// GetNodeCache returns the existing NodeCache for given node if present. Otherwise, // GetNodeCache returns the existing NodeCache for given node if present. Otherwise,
// it creates the NodeCache and returns it. // it creates the NodeCache and returns it.
// The boolean flag is true if the value was loaded, false if created. // The boolean flag is true if the value was loaded, false if created.
func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) { func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
v, exists := c.LoadOrStore(name, newNodeCache()) c.mu.Lock()
nodeCache = v.(*NodeCache) defer c.mu.Unlock()
if nodeCache, exists = c.nodeToCache[name]; !exists {
nodeCache = newNodeCache(len(c.predicateIDMap))
c.nodeToCache[name] = nodeCache
}
return return
} }
// LoadNodeCache returns the existing NodeCache for given node, nil if not
// present.
func (c *Cache) LoadNodeCache(node string) *NodeCache {
c.mu.RLock()
defer c.mu.RUnlock()
return c.nodeToCache[node]
}
func (c *Cache) predicateKeysToIDs(predicateKeys sets.String) []int {
predicateIDs := make([]int, 0, len(predicateKeys))
for predicateKey := range predicateKeys {
if id, ok := c.predicateIDMap[predicateKey]; ok {
predicateIDs = append(predicateIDs, id)
} else {
glog.Errorf("predicate key %q not found", predicateKey)
}
}
return predicateIDs
}
// InvalidatePredicates clears all cached results for the given predicates. // InvalidatePredicates clears all cached results for the given predicates.
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) { func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 { if len(predicateKeys) == 0 {
return return
} }
c.Range(func(k, v interface{}) bool { c.mu.RLock()
n := v.(*NodeCache) defer c.mu.RUnlock()
n.invalidatePreds(predicateKeys) predicateIDs := c.predicateKeysToIDs(predicateKeys)
return true for _, n := range c.nodeToCache {
}) n.invalidatePreds(predicateIDs)
}
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys) glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
} }
@ -100,16 +169,22 @@ func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.S
if len(predicateKeys) == 0 { if len(predicateKeys) == 0 {
return return
} }
if v, ok := c.Load(nodeName); ok { c.mu.RLock()
n := v.(*NodeCache) defer c.mu.RUnlock()
n.invalidatePreds(predicateKeys) predicateIDs := c.predicateKeysToIDs(predicateKeys)
if n, ok := c.nodeToCache[nodeName]; ok {
n.invalidatePreds(predicateIDs)
} }
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys) glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
} }
// InvalidateAllPredicatesOnNode clears all cached results for one node. // InvalidateAllPredicatesOnNode clears all cached results for one node.
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) { func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
c.Delete(nodeName) c.mu.RLock()
defer c.mu.RUnlock()
if node, ok := c.nodeToCache[nodeName]; ok {
node.invalidate()
}
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName) glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
} }
@ -186,8 +261,8 @@ func NewClass(pod *v1.Pod) *Class {
return nil return nil
} }
// predicateMap stores resultMaps with predicate name as the key. // predicateMap stores resultMaps with predicate ID as the key.
type predicateMap map[string]resultMap type predicateMap []resultMap
// resultMap stores PredicateResult with pod equivalence hash as the key. // resultMap stores PredicateResult with pod equivalence hash as the key.
type resultMap map[uint64]predicateResult type resultMap map[uint64]predicateResult
@ -201,22 +276,22 @@ type predicateResult struct {
// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be // RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be
// run and its results cached for the next call. // run and its results cached for the next call.
// //
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale. // NOTE: RunPredicate will not update the equivalence cache if generation does not match live version.
func (n *NodeCache) RunPredicate( func (n *NodeCache) RunPredicate(
pred algorithm.FitPredicate, pred algorithm.FitPredicate,
predicateKey string, predicateKey string,
predicateID int,
pod *v1.Pod, pod *v1.Pod,
meta algorithm.PredicateMetadata, meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo, nodeInfo *schedulercache.NodeInfo,
equivClass *Class, equivClass *Class,
cache schedulercache.Cache,
) (bool, []algorithm.PredicateFailureReason, error) { ) (bool, []algorithm.PredicateFailureReason, error) {
if nodeInfo == nil || nodeInfo.Node() == nil { if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests. // This may happen during tests.
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
} }
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash) result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, predicateID, equivClass.hash)
if ok { if ok {
return result.Fit, result.FailReasons, nil return result.Fit, result.FailReasons, nil
} }
@ -224,19 +299,17 @@ func (n *NodeCache) RunPredicate(
if err != nil { if err != nil {
return fit, reasons, err return fit, reasons, err
} }
if cache != nil { n.updateResult(pod.GetName(), predicateKey, predicateID, fit, reasons, equivClass.hash, nodeInfo)
n.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
}
return fit, reasons, nil return fit, reasons, nil
} }
// updateResult updates the cached result of a predicate. // updateResult updates the cached result of a predicate.
func (n *NodeCache) updateResult( func (n *NodeCache) updateResult(
podName, predicateKey string, podName, predicateKey string,
predicateID int,
fit bool, fit bool,
reasons []algorithm.PredicateFailureReason, reasons []algorithm.PredicateFailureReason,
equivalenceHash uint64, equivalenceHash uint64,
cache schedulercache.Cache,
nodeInfo *schedulercache.NodeInfo, nodeInfo *schedulercache.NodeInfo,
) { ) {
if nodeInfo == nil || nodeInfo.Node() == nil { if nodeInfo == nil || nodeInfo.Node() == nil {
@ -244,11 +317,6 @@ func (n *NodeCache) updateResult(
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc() metrics.EquivalenceCacheWrites.WithLabelValues("discarded_bad_node").Inc()
return return
} }
// Skip update if NodeInfo is stale.
if !cache.IsUpToDate(nodeInfo) {
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc()
return
}
predicateItem := predicateResult{ predicateItem := predicateResult{
Fit: fit, Fit: fit,
@ -257,16 +325,24 @@ func (n *NodeCache) updateResult(
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
if (n.snapshotGeneration != n.generation) || (n.snapshotPredicateGenerations[predicateID] != n.predicateGenerations[predicateID]) {
// Generation of node or predicate has been updated since we last took
// a snapshot, this indicates that we received a invalidation request
// during this time. Cache may be stale, skip update.
metrics.EquivalenceCacheWrites.WithLabelValues("discarded_stale").Inc()
return
}
// If cached predicate map already exists, just update the predicate by key // If cached predicate map already exists, just update the predicate by key
if predicates, ok := n.cache[predicateKey]; ok { if predicates := n.cache[predicateID]; predicates != nil {
// maps in golang are references, no need to add them back // maps in golang are references, no need to add them back
predicates[equivalenceHash] = predicateItem predicates[equivalenceHash] = predicateItem
} else { } else {
n.cache[predicateKey] = n.cache[predicateID] =
resultMap{ resultMap{
equivalenceHash: predicateItem, equivalenceHash: predicateItem,
} }
} }
n.predicateGenerations[predicateID]++
glog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v", glog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v",
nodeInfo.Node().Name, predicateKey, podName, predicateItem) nodeInfo.Node().Name, predicateKey, podName, predicateItem)
@ -276,11 +352,12 @@ func (n *NodeCache) updateResult(
// cache entry was found. // cache entry was found.
func (n *NodeCache) lookupResult( func (n *NodeCache) lookupResult(
podName, nodeName, predicateKey string, podName, nodeName, predicateKey string,
predicateID int,
equivalenceHash uint64, equivalenceHash uint64,
) (value predicateResult, ok bool) { ) (value predicateResult, ok bool) {
n.mu.RLock() n.mu.RLock()
defer n.mu.RUnlock() defer n.mu.RUnlock()
value, ok = n.cache[predicateKey][equivalenceHash] value, ok = n.cache[predicateID][equivalenceHash]
if ok { if ok {
metrics.EquivalenceCacheHits.Inc() metrics.EquivalenceCacheHits.Inc()
} else { } else {
@ -289,15 +366,24 @@ func (n *NodeCache) lookupResult(
return value, ok return value, ok
} }
// invalidatePreds deletes cached predicates by given keys. // invalidatePreds deletes cached predicates by given IDs.
func (n *NodeCache) invalidatePreds(predicateKeys sets.String) { func (n *NodeCache) invalidatePreds(predicateIDs []int) {
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
for predicateKey := range predicateKeys { for _, predicateID := range predicateIDs {
delete(n.cache, predicateKey) n.cache[predicateID] = nil
n.predicateGenerations[predicateID]++
} }
} }
// invalidate invalidates node cache.
func (n *NodeCache) invalidate() {
n.mu.Lock()
defer n.mu.Unlock()
n.cache = make(predicateMap, len(n.cache))
n.generation++
}
// equivalencePod is the set of pod attributes which must match for two pods to // equivalencePod is the set of pod attributes which must match for two pods to
// be considered equivalent for scheduling purposes. For correctness, this must // be considered equivalent for scheduling purposes. For correctness, this must
// include any Pod field which is used by a FitPredicate. // include any Pod field which is used by a FitPredicate.

View File

@ -28,7 +28,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
) )
// makeBasicPod returns a Pod object with many of the fields populated. // makeBasicPod returns a Pod object with many of the fields populated.
@ -155,16 +154,6 @@ type predicateItemType struct {
reasons []algorithm.PredicateFailureReason reasons []algorithm.PredicateFailureReason
} }
// upToDateCache is a fake Cache where IsUpToDate always returns true.
type upToDateCache = schedulertesting.FakeCache
// staleNodeCache is a fake Cache where IsUpToDate always returns false.
type staleNodeCache struct {
schedulertesting.FakeCache
}
func (c *staleNodeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return false }
// mockPredicate provides an algorithm.FitPredicate with pre-set return values. // mockPredicate provides an algorithm.FitPredicate with pre-set return values.
type mockPredicate struct { type mockPredicate struct {
fit bool fit bool
@ -182,7 +171,6 @@ func TestRunPredicate(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pred mockPredicate pred mockPredicate
cache schedulercache.Cache
expectFit, expectCacheHit, expectCacheWrite bool expectFit, expectCacheHit, expectCacheWrite bool
expectedReasons []algorithm.PredicateFailureReason expectedReasons []algorithm.PredicateFailureReason
expectedError string expectedError string
@ -190,7 +178,6 @@ func TestRunPredicate(t *testing.T) {
{ {
name: "pod fits/cache hit", name: "pod fits/cache hit",
pred: mockPredicate{}, pred: mockPredicate{},
cache: &upToDateCache{},
expectFit: true, expectFit: true,
expectCacheHit: true, expectCacheHit: true,
expectCacheWrite: false, expectCacheWrite: false,
@ -198,23 +185,13 @@ func TestRunPredicate(t *testing.T) {
{ {
name: "pod fits/cache miss", name: "pod fits/cache miss",
pred: mockPredicate{fit: true}, pred: mockPredicate{fit: true},
cache: &upToDateCache{},
expectFit: true, expectFit: true,
expectCacheHit: false, expectCacheHit: false,
expectCacheWrite: true, expectCacheWrite: true,
}, },
{
name: "pod fits/cache miss/no write",
pred: mockPredicate{fit: true},
cache: &staleNodeCache{},
expectFit: true,
expectCacheHit: false,
expectCacheWrite: false,
},
{ {
name: "pod doesn't fit/cache miss", name: "pod doesn't fit/cache miss",
pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}}, pred: mockPredicate{reasons: []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}},
cache: &upToDateCache{},
expectFit: false, expectFit: false,
expectCacheHit: false, expectCacheHit: false,
expectCacheWrite: true, expectCacheWrite: true,
@ -223,7 +200,6 @@ func TestRunPredicate(t *testing.T) {
{ {
name: "pod doesn't fit/cache hit", name: "pod doesn't fit/cache hit",
pred: mockPredicate{}, pred: mockPredicate{},
cache: &upToDateCache{},
expectFit: false, expectFit: false,
expectCacheHit: true, expectCacheHit: true,
expectCacheWrite: false, expectCacheWrite: false,
@ -232,7 +208,6 @@ func TestRunPredicate(t *testing.T) {
{ {
name: "predicate error", name: "predicate error",
pred: mockPredicate{err: errors.New("This is expected")}, pred: mockPredicate{err: errors.New("This is expected")},
cache: &upToDateCache{},
expectFit: false, expectFit: false,
expectCacheHit: false, expectCacheHit: false,
expectCacheWrite: false, expectCacheWrite: false,
@ -240,6 +215,8 @@ func TestRunPredicate(t *testing.T) {
}, },
} }
predicatesOrdering := []string{"testPredicate"}
predicateID := 0
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
node := schedulercache.NewNodeInfo() node := schedulercache.NewNodeInfo()
@ -249,15 +226,16 @@ func TestRunPredicate(t *testing.T) {
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
// Initialize and populate equivalence class cache. // Initialize and populate equivalence class cache.
ecache := NewCache() ecache := NewCache(predicatesOrdering)
ecache.Snapshot()
nodeCache, _ := ecache.GetNodeCache(testNode.Name) nodeCache, _ := ecache.GetNodeCache(testNode.Name)
equivClass := NewClass(pod) equivClass := NewClass(pod)
if test.expectCacheHit { if test.expectCacheHit {
nodeCache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node) nodeCache.updateResult(pod.Name, "testPredicate", predicateID, test.expectFit, test.expectedReasons, equivClass.hash, node)
} }
fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache) fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", predicateID, pod, meta, node, equivClass)
if err != nil { if err != nil {
if err.Error() != test.expectedError { if err.Error() != test.expectedError {
@ -288,7 +266,7 @@ func TestRunPredicate(t *testing.T) {
if !test.expectCacheHit && test.pred.callCount == 0 { if !test.expectCacheHit && test.pred.callCount == 0 {
t.Errorf("Predicate should be called") t.Errorf("Predicate should be called")
} }
_, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash) _, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", predicateID, equivClass.hash)
if !ok && test.expectCacheWrite { if !ok && test.expectCacheWrite {
t.Errorf("Cache write should happen") t.Errorf("Cache write should happen")
} }
@ -303,22 +281,24 @@ func TestRunPredicate(t *testing.T) {
} }
func TestUpdateResult(t *testing.T) { func TestUpdateResult(t *testing.T) {
predicatesOrdering := []string{"GeneralPredicates"}
tests := []struct { tests := []struct {
name string name string
pod string pod string
predicateKey string predicateKey string
predicateID int
nodeName string nodeName string
fit bool fit bool
reasons []algorithm.PredicateFailureReason reasons []algorithm.PredicateFailureReason
equivalenceHash uint64 equivalenceHash uint64
expectPredicateMap bool expectPredicateMap bool
expectCacheItem predicateResult expectCacheItem predicateResult
cache schedulercache.Cache
}{ }{
{ {
name: "test 1", name: "test 1",
pod: "testPod", pod: "testPod",
predicateKey: "GeneralPredicates", predicateKey: "GeneralPredicates",
predicateID: 0,
nodeName: "node1", nodeName: "node1",
fit: true, fit: true,
equivalenceHash: 123, equivalenceHash: 123,
@ -326,12 +306,12 @@ func TestUpdateResult(t *testing.T) {
expectCacheItem: predicateResult{ expectCacheItem: predicateResult{
Fit: true, Fit: true,
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 2", name: "test 2",
pod: "testPod", pod: "testPod",
predicateKey: "GeneralPredicates", predicateKey: "GeneralPredicates",
predicateID: 0,
nodeName: "node2", nodeName: "node2",
fit: false, fit: false,
equivalenceHash: 123, equivalenceHash: 123,
@ -339,7 +319,6 @@ func TestUpdateResult(t *testing.T) {
expectCacheItem: predicateResult{ expectCacheItem: predicateResult{
Fit: false, Fit: false,
}, },
cache: &upToDateCache{},
}, },
} }
for _, test := range tests { for _, test := range tests {
@ -349,14 +328,14 @@ func TestUpdateResult(t *testing.T) {
node.SetNode(testNode) node.SetNode(testNode)
// Initialize and populate equivalence class cache. // Initialize and populate equivalence class cache.
ecache := NewCache() ecache := NewCache(predicatesOrdering)
nodeCache, _ := ecache.GetNodeCache(testNode.Name) nodeCache, _ := ecache.GetNodeCache(testNode.Name)
if test.expectPredicateMap { if test.expectPredicateMap {
predicateItem := predicateResult{ predicateItem := predicateResult{
Fit: true, Fit: true,
} }
nodeCache.cache[test.predicateKey] = nodeCache.cache[test.predicateID] =
resultMap{ resultMap{
test.equivalenceHash: predicateItem, test.equivalenceHash: predicateItem,
} }
@ -365,15 +344,15 @@ func TestUpdateResult(t *testing.T) {
nodeCache.updateResult( nodeCache.updateResult(
test.pod, test.pod,
test.predicateKey, test.predicateKey,
test.predicateID,
test.fit, test.fit,
test.reasons, test.reasons,
test.equivalenceHash, test.equivalenceHash,
test.cache,
node, node,
) )
cachedMapItem, ok := nodeCache.cache[test.predicateKey] cachedMapItem := nodeCache.cache[test.predicateID]
if !ok { if cachedMapItem == nil {
t.Errorf("can't find expected cache item: %v", test.expectCacheItem) t.Errorf("can't find expected cache item: %v", test.expectCacheItem)
} else { } else {
if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) { if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) {
@ -394,18 +373,19 @@ func slicesEqual(a, b []algorithm.PredicateFailureReason) bool {
} }
func TestLookupResult(t *testing.T) { func TestLookupResult(t *testing.T) {
predicatesOrdering := []string{"GeneralPredicates"}
tests := []struct { tests := []struct {
name string name string
podName string podName string
nodeName string nodeName string
predicateKey string predicateKey string
predicateID int
equivalenceHashForUpdatePredicate uint64 equivalenceHashForUpdatePredicate uint64
equivalenceHashForCalPredicate uint64 equivalenceHashForCalPredicate uint64
cachedItem predicateItemType cachedItem predicateItemType
expectedPredicateKeyMiss bool expectedPredicateKeyMiss bool
expectedEquivalenceHashMiss bool expectedEquivalenceHashMiss bool
expectedPredicateItem predicateItemType expectedPredicateItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
name: "test 1", name: "test 1",
@ -414,6 +394,7 @@ func TestLookupResult(t *testing.T) {
equivalenceHashForUpdatePredicate: 123, equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123, equivalenceHashForCalPredicate: 123,
predicateKey: "GeneralPredicates", predicateKey: "GeneralPredicates",
predicateID: 0,
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
@ -423,7 +404,6 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 2", name: "test 2",
@ -432,6 +412,7 @@ func TestLookupResult(t *testing.T) {
equivalenceHashForUpdatePredicate: 123, equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123, equivalenceHashForCalPredicate: 123,
predicateKey: "GeneralPredicates", predicateKey: "GeneralPredicates",
predicateID: 0,
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: true, fit: true,
}, },
@ -440,7 +421,6 @@ func TestLookupResult(t *testing.T) {
fit: true, fit: true,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 3", name: "test 3",
@ -449,6 +429,7 @@ func TestLookupResult(t *testing.T) {
equivalenceHashForUpdatePredicate: 123, equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123, equivalenceHashForCalPredicate: 123,
predicateKey: "GeneralPredicates", predicateKey: "GeneralPredicates",
predicateID: 0,
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
@ -458,7 +439,6 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "test 4", name: "test 4",
@ -467,6 +447,7 @@ func TestLookupResult(t *testing.T) {
equivalenceHashForUpdatePredicate: 123, equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 456, equivalenceHashForCalPredicate: 456,
predicateKey: "GeneralPredicates", predicateKey: "GeneralPredicates",
predicateID: 0,
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
@ -477,7 +458,6 @@ func TestLookupResult(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{}, reasons: []algorithm.PredicateFailureReason{},
}, },
cache: &upToDateCache{},
}, },
} }
@ -486,7 +466,7 @@ func TestLookupResult(t *testing.T) {
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}} testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
// Initialize and populate equivalence class cache. // Initialize and populate equivalence class cache.
ecache := NewCache() ecache := NewCache(predicatesOrdering)
nodeCache, _ := ecache.GetNodeCache(testNode.Name) nodeCache, _ := ecache.GetNodeCache(testNode.Name)
node := schedulercache.NewNodeInfo() node := schedulercache.NewNodeInfo()
@ -495,10 +475,10 @@ func TestLookupResult(t *testing.T) {
nodeCache.updateResult( nodeCache.updateResult(
test.podName, test.podName,
test.predicateKey, test.predicateKey,
test.predicateID,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node, node,
) )
// if we want to do invalid, invalid the cached item // if we want to do invalid, invalid the cached item
@ -511,6 +491,7 @@ func TestLookupResult(t *testing.T) {
result, ok := nodeCache.lookupResult(test.podName, result, ok := nodeCache.lookupResult(test.podName,
test.nodeName, test.nodeName,
test.predicateKey, test.predicateKey,
test.predicateID,
test.equivalenceHashForCalPredicate, test.equivalenceHashForCalPredicate,
) )
fit, reasons := result.Fit, result.FailReasons fit, reasons := result.Fit, result.FailReasons
@ -659,6 +640,8 @@ func TestGetEquivalenceHash(t *testing.T) {
func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
testPredicate := "GeneralPredicates" testPredicate := "GeneralPredicates"
testPredicateID := 0
predicatesOrdering := []string{testPredicate}
// tests is used to initialize all nodes // tests is used to initialize all nodes
tests := []struct { tests := []struct {
name string name string
@ -666,7 +649,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
nodeName string nodeName string
equivalenceHashForUpdatePredicate uint64 equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType cachedItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
name: "hash predicate 123 not fits host ports", name: "hash predicate 123 not fits host ports",
@ -679,7 +661,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
predicates.ErrPodNotFitsHostPorts, predicates.ErrPodNotFitsHostPorts,
}, },
}, },
cache: &upToDateCache{},
}, },
{ {
name: "hash predicate 456 not fits host ports", name: "hash predicate 456 not fits host ports",
@ -692,7 +673,6 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
predicates.ErrPodNotFitsHostPorts, predicates.ErrPodNotFitsHostPorts,
}, },
}, },
cache: &upToDateCache{},
}, },
{ {
name: "hash predicate 123 fits", name: "hash predicate 123 fits",
@ -702,10 +682,9 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: true, fit: true,
}, },
cache: &upToDateCache{},
}, },
} }
ecache := NewCache() ecache := NewCache(predicatesOrdering)
for _, test := range tests { for _, test := range tests {
node := schedulercache.NewNodeInfo() node := schedulercache.NewNodeInfo()
@ -717,10 +696,10 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
nodeCache.updateResult( nodeCache.updateResult(
test.podName, test.podName,
testPredicate, testPredicate,
testPredicateID,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node, node,
) )
} }
@ -731,8 +710,8 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
// there should be no cached predicate any more // there should be no cached predicate any more
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
if nodeCache, exist := ecache.GetNodeCache(test.nodeName); exist { if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist {
if _, exist := nodeCache.cache[testPredicate]; exist { if cache := nodeCache.cache[testPredicateID]; cache != nil {
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated", t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
testPredicate, test.nodeName) testPredicate, test.nodeName)
} }
@ -743,6 +722,8 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
testPredicate := "GeneralPredicates" testPredicate := "GeneralPredicates"
testPredicateID := 0
predicatesOrdering := []string{testPredicate}
// tests is used to initialize all nodes // tests is used to initialize all nodes
tests := []struct { tests := []struct {
name string name string
@ -750,7 +731,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
nodeName string nodeName string
equivalenceHashForUpdatePredicate uint64 equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType cachedItem predicateItemType
cache schedulercache.Cache
}{ }{
{ {
name: "hash predicate 123 not fits host ports", name: "hash predicate 123 not fits host ports",
@ -761,7 +741,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "hash predicate 456 not fits host ports", name: "hash predicate 456 not fits host ports",
@ -772,7 +751,6 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
fit: false, fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
}, },
cache: &upToDateCache{},
}, },
{ {
name: "hash predicate 123 fits host ports", name: "hash predicate 123 fits host ports",
@ -782,10 +760,9 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
cachedItem: predicateItemType{ cachedItem: predicateItemType{
fit: true, fit: true,
}, },
cache: &upToDateCache{},
}, },
} }
ecache := NewCache() ecache := NewCache(predicatesOrdering)
for _, test := range tests { for _, test := range tests {
node := schedulercache.NewNodeInfo() node := schedulercache.NewNodeInfo()
@ -797,19 +774,21 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
nodeCache.updateResult( nodeCache.updateResult(
test.podName, test.podName,
testPredicate, testPredicate,
testPredicateID,
test.cachedItem.fit, test.cachedItem.fit,
test.cachedItem.reasons, test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate, test.equivalenceHashForUpdatePredicate,
test.cache,
node, node,
) )
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
oldNodeCache, _ := ecache.GetNodeCache(test.nodeName)
oldGeneration := oldNodeCache.generation
// invalidate cached predicate for all nodes // invalidate cached predicate for all nodes
ecache.InvalidateAllPredicatesOnNode(test.nodeName) ecache.InvalidateAllPredicatesOnNode(test.nodeName)
if _, ok := ecache.GetNodeCache(test.nodeName); ok { if n, _ := ecache.GetNodeCache(test.nodeName); oldGeneration == n.generation {
t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName)
} }
}) })

View File

@ -111,6 +111,25 @@ type genericScheduler struct {
percentageOfNodesToScore int32 percentageOfNodesToScore int32
} }
// snapshot snapshots equivalane cache and node infos for all fit and priority
// functions.
func (g *genericScheduler) snapshot() error {
// IMPORTANT NOTE: We must snapshot equivalence cache before snapshotting
// scheduler cache, otherwise stale data may be written into equivalence
// cache, e.g.
// 1. snapshot cache
// 2. event arrives, updating cache and invalidating predicates or whole node cache
// 3. snapshot ecache
// 4. evaludate predicates
// 5. stale result will be written to ecache
if g.equivalenceCache != nil {
g.equivalenceCache.Snapshot()
}
// Used for all fit and priority funcs.
return g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
}
// Schedule tries to schedule the given pod to one of the nodes in the node list. // Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node. // If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons. // If it fails, it will return a FitError error with reasons.
@ -130,8 +149,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
return "", ErrNoNodesAvailable return "", ErrNoNodesAvailable
} }
// Used for all fit and priority funcs. err = g.snapshot()
err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -227,7 +245,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
if !ok || fitError == nil { if !ok || fitError == nil {
return nil, nil, nil, nil return nil, nil, nil, nil
} }
err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) err := g.snapshot()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -396,14 +414,13 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
var nodeCache *equivalence.NodeCache var nodeCache *equivalence.NodeCache
nodeName := g.cache.NodeTree().Next() nodeName := g.cache.NodeTree().Next()
if g.equivalenceCache != nil { if g.equivalenceCache != nil {
nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName) nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
} }
fits, failedPredicates, err := podFitsOnNode( fits, failedPredicates, err := podFitsOnNode(
pod, pod,
meta, meta,
g.cachedNodeInfoMap[nodeName], g.cachedNodeInfoMap[nodeName],
g.predicates, g.predicates,
g.cache,
nodeCache, nodeCache,
g.schedulingQueue, g.schedulingQueue,
g.alwaysCheckAllPredicates, g.alwaysCheckAllPredicates,
@ -516,7 +533,6 @@ func podFitsOnNode(
meta algorithm.PredicateMetadata, meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo, info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate, predicateFuncs map[string]algorithm.FitPredicate,
cache schedulercache.Cache,
nodeCache *equivalence.NodeCache, nodeCache *equivalence.NodeCache,
queue SchedulingQueue, queue SchedulingQueue,
alwaysCheckAllPredicates bool, alwaysCheckAllPredicates bool,
@ -558,7 +574,7 @@ func podFitsOnNode(
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change. // when pods are nominated or their nominations change.
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
for _, predicateKey := range predicates.Ordering() { for predicateID, predicateKey := range predicates.Ordering() {
var ( var (
fit bool fit bool
reasons []algorithm.PredicateFailureReason reasons []algorithm.PredicateFailureReason
@ -567,7 +583,7 @@ func podFitsOnNode(
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist { if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable { if eCacheAvailable {
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache) fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
} else { } else {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
} }
@ -991,7 +1007,7 @@ func selectVictimsOnNode(
// that we should check is if the "pod" is failing to schedule due to pod affinity // that we should check is if the "pod" is failing to schedule due to pod affinity
// failure. // failure.
// TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance.
if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil); !fits { if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
if err != nil { if err != nil {
glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
} }
@ -1005,7 +1021,7 @@ func selectVictimsOnNode(
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs) violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
reprievePod := func(p *v1.Pod) bool { reprievePod := func(p *v1.Pod) bool {
addPod(p) addPod(p)
fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, nil, queue, false, nil) fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
if !fits { if !fits {
removePod(p) removePod(p)
victims = append(victims, p) victims = append(victims, p)

View File

@ -1474,7 +1474,10 @@ func TestCacheInvalidationRace(t *testing.T) {
cacheInvalidated: make(chan struct{}), cacheInvalidated: make(chan struct{}),
} }
eCache := equivalence.NewCache() ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
eCache := equivalence.NewCache(algorithmpredicates.Ordering())
eCache.GetNodeCache(testNode.Name)
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before // Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated. // the equivalence cache would be updated.
go func() { go func() {
@ -1490,8 +1493,6 @@ func TestCacheInvalidationRace(t *testing.T) {
}() }()
// Set up the scheduler. // Set up the scheduler.
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}} prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{}) pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
@ -1522,3 +1523,84 @@ func TestCacheInvalidationRace(t *testing.T) {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount) t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
} }
} }
// TestCacheInvalidationRace2 tests that cache invalidation is correctly handled
// when an invalidation event happens while a predicate is running.
func TestCacheInvalidationRace2(t *testing.T) {
// Create a predicate that returns false the first time and true on subsequent calls.
var (
podWillFit = false
callCount int
cycleStart = make(chan struct{})
cacheInvalidated = make(chan struct{})
once sync.Once
)
testPredicate := func(pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
callCount++
once.Do(func() {
cycleStart <- struct{}{}
<-cacheInvalidated
})
if !podWillFit {
podWillFit = true
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
return true, nil, nil
}
// Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
cache.AddNode(testNode)
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
eCache := equivalence.NewCache(algorithmpredicates.Ordering())
eCache.GetNodeCache(testNode.Name)
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated.
go func() {
<-cycleStart
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
Spec: v1.PodSpec{NodeName: "machine1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err)
}
eCache.InvalidateAllPredicatesOnNode("machine1")
cacheInvalidated <- struct{}{}
}()
// Set up the scheduler.
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
scheduler := NewGenericScheduler(
cache,
eCache,
NewSchedulingQueue(),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false,
schedulerapi.DefaultPercentageOfNodesToScore)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
machine, err := scheduler.Schedule(pod, nodeLister)
if machine != "" || err == nil {
t.Error("First scheduling attempt did not fail")
}
// Second scheduling attempt should succeed because cache was invalidated.
_, err = scheduler.Schedule(pod, nodeLister)
if err != nil {
t.Errorf("Second scheduling attempt failed: %v", err)
}
if callCount != 2 {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
}
}

View File

@ -733,9 +733,11 @@ func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
return return
} }
// NOTE: Because the scheduler uses snapshots of schedulerCache and the live // NOTE: Updates must be written to scheduler cache before invalidating
// version of equivalencePodCache, updates must be written to schedulerCache // equivalence cache, because we could snapshot equivalence cache after the
// before invalidating equivalencePodCache. // invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil { if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
glog.Errorf("scheduler cache UpdatePod failed: %v", err) glog.Errorf("scheduler cache UpdatePod failed: %v", err)
} }
@ -822,9 +824,11 @@ func (c *configFactory) deletePodFromCache(obj interface{}) {
glog.Errorf("cannot convert to *v1.Pod: %v", t) glog.Errorf("cannot convert to *v1.Pod: %v", t)
return return
} }
// NOTE: Because the scheduler uses snapshots of schedulerCache and the live // NOTE: Updates must be written to scheduler cache before invalidating
// version of equivalencePodCache, updates must be written to schedulerCache // equivalence cache, because we could snapshot equivalence cache after the
// before invalidating equivalencePodCache. // invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := c.schedulerCache.RemovePod(pod); err != nil { if err := c.schedulerCache.RemovePod(pod); err != nil {
glog.Errorf("scheduler cache RemovePod failed: %v", err) glog.Errorf("scheduler cache RemovePod failed: %v", err)
} }
@ -861,15 +865,17 @@ func (c *configFactory) addNodeToCache(obj interface{}) {
return return
} }
if err := c.schedulerCache.AddNode(node); err != nil { // NOTE: Because the scheduler uses equivalence cache for nodes, we need
glog.Errorf("scheduler cache AddNode failed: %v", err) // to create it before adding node into scheduler cache.
}
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {
// GetNodeCache() will lazily create NodeCache for given node if it does not exist. // GetNodeCache() will lazily create NodeCache for given node if it does not exist.
c.equivalencePodCache.GetNodeCache(node.GetName()) c.equivalencePodCache.GetNodeCache(node.GetName())
} }
if err := c.schedulerCache.AddNode(node); err != nil {
glog.Errorf("scheduler cache AddNode failed: %v", err)
}
c.podQueue.MoveAllToActiveQueue() c.podQueue.MoveAllToActiveQueue()
// NOTE: add a new node does not affect existing predicates in equivalence cache // NOTE: add a new node does not affect existing predicates in equivalence cache
} }
@ -886,9 +892,11 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
return return
} }
// NOTE: Because the scheduler uses snapshots of schedulerCache and the live // NOTE: Updates must be written to scheduler cache before invalidating
// version of equivalencePodCache, updates must be written to schedulerCache // equivalence cache, because we could snapshot equivalence cache after the
// before invalidating equivalencePodCache. // invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil { if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
glog.Errorf("scheduler cache UpdateNode failed: %v", err) glog.Errorf("scheduler cache UpdateNode failed: %v", err)
} }
@ -982,9 +990,11 @@ func (c *configFactory) deleteNodeFromCache(obj interface{}) {
glog.Errorf("cannot convert to *v1.Node: %v", t) glog.Errorf("cannot convert to *v1.Node: %v", t)
return return
} }
// NOTE: Because the scheduler uses snapshots of schedulerCache and the live // NOTE: Updates must be written to scheduler cache before invalidating
// version of equivalencePodCache, updates must be written to schedulerCache // equivalence cache, because we could snapshot equivalence cache after the
// before invalidating equivalencePodCache. // invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := c.schedulerCache.RemoveNode(node); err != nil { if err := c.schedulerCache.RemoveNode(node); err != nil {
glog.Errorf("scheduler cache RemoveNode failed: %v", err) glog.Errorf("scheduler cache RemoveNode failed: %v", err)
} }
@ -1178,7 +1188,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
// Init equivalence class cache // Init equivalence class cache
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {
c.equivalencePodCache = equivalence.NewCache() c.equivalencePodCache = equivalence.NewCache(predicates.Ordering())
glog.Info("Created equivalence class cache") glog.Info("Created equivalence class cache")
} }
@ -1414,9 +1424,11 @@ func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue
_, err := c.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) _, err := c.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil && errors.IsNotFound(err) { if err != nil && errors.IsNotFound(err) {
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}
// NOTE: Because the scheduler uses snapshots of schedulerCache and the live // NOTE: Updates must be written to scheduler cache before invalidating
// version of equivalencePodCache, updates must be written to schedulerCache // equivalence cache, because we could snapshot equivalence cache after the
// before invalidating equivalencePodCache. // invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
c.schedulerCache.RemoveNode(&node) c.schedulerCache.RemoveNode(&node)
// invalidate cached predicate for the node // invalidate cached predicate for the node
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {

View File

@ -329,9 +329,11 @@ func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// If the binding fails, scheduler will release resources allocated to assumed pod // If the binding fails, scheduler will release resources allocated to assumed pod
// immediately. // immediately.
assumed.Spec.NodeName = host assumed.Spec.NodeName = host
// NOTE: Because the scheduler uses snapshots of SchedulerCache and the live // NOTE: Updates must be written to scheduler cache before invalidating
// version of Ecache, updates must be written to SchedulerCache before // equivalence cache, because we could snapshot equivalence cache after the
// invalidating Ecache. // invalidation and then snapshot the cache itself. If the cache is
// snapshotted before updates are written, we would update equivalence
// cache with stale information which is based on snapshot of old cache.
if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil { if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
glog.Errorf("scheduler cache AssumePod failed: %v", err) glog.Errorf("scheduler cache AssumePod failed: %v", err)

View File

@ -106,8 +106,5 @@ func (f *FakeCache) Snapshot() *schedulercache.Snapshot {
return &schedulercache.Snapshot{} return &schedulercache.Snapshot{}
} }
// IsUpToDate is a fake method for testing
func (f *FakeCache) IsUpToDate(*schedulercache.NodeInfo) bool { return true }
// NodeTree is a fake method for testing. // NodeTree is a fake method for testing.
func (f *FakeCache) NodeTree() *schedulercache.NodeTree { return nil } func (f *FakeCache) NodeTree() *schedulercache.NodeTree { return nil }