Merge pull request #65714 from resouer/fix-63784

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Re-design equivalence class cache to two level cache

**What this PR does / why we need it**:

The current ecache introduced a global lock across all the nodes, and this patch tried to assign ecache per node to eliminate that global lock. The improvement of scheduling performance and throughput are both significant.

**CPU Profile Result** 

Machine: 32-core 60GB GCE VM

1k nodes 10k pods bench test (we've highlighted the critical function):

1. Current default scheduler with ecache enabled:
![equivlance class cache bench test 001](https://user-images.githubusercontent.com/1701782/42196992-51b0a32a-7eb3-11e8-89ee-f13383091a00.jpeg)
2. Current default scheduler with ecache disabled:
![equivlance class cache bench test 002](https://user-images.githubusercontent.com/1701782/42196993-51eb0c68-7eb3-11e8-9326-1a7762072863.jpeg)
3. Current default scheduler with this patch and ecache enabled:
![equivlance class cache bench test 003](https://user-images.githubusercontent.com/1701782/42196994-52280ed8-7eb3-11e8-8100-690e2af2cf2f.jpeg)

**Throughput Test Result** 

1k nodes 3k pods `scheduler_perf` test: 

Current default scheduler, ecache is disabled:
```bash
Minimal observed throughput for 3k pod test: 200
PASS
ok      k8s.io/kubernetes/test/integration/scheduler_perf    30.091s
```
With this patch, ecache is enabled:
```bash
Minimal observed throughput for 3k pod test: 556
PASS
ok      k8s.io/kubernetes/test/integration/scheduler_perf    11.119s
```

**Design and implementation:**

The idea is: we re-designed ecache into a "two level cache". 

The first level cache holds the global lock across nodes and sync is needed only when node is added or deleted, which is of much lower frequency. 

The second level cache is assigned per node and its lock is restricted to per node level, thus there's no need to bother the global lock during whole predicate process cycle. For more detail, please check [the original discussion](https://github.com/kubernetes/kubernetes/issues/63784#issuecomment-399848349).

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #63784

**Special notes for your reviewer**:

~~Tagged as WIP to make sure this does not break existing code and tests, we can start review after CI is happy.~~

**Release note**:

```release-note
Re-design equivalence class cache to two level cache
```
This commit is contained in:
Kubernetes Submit Queue 2018-07-19 16:16:02 -07:00 committed by GitHub
commit 795b7da8b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 214 additions and 144 deletions

View File

@ -52,7 +52,7 @@ type schedulerCache struct {
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.Mutex
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
assumedPods map[string]bool
@ -112,8 +112,8 @@ func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedul
// Snapshot takes a snapshot of the current schedulerCache. The method has performance impact,
// and should be only used in non-critical path.
func (cache *schedulerCache) Snapshot() *Snapshot {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make(map[string]*NodeInfo)
for k, v := range cache.nodes {
@ -164,8 +164,8 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
}
func (cache *schedulerCache) FilteredList(podFilter PodFilter, selector labels.Selector) ([]*v1.Pod, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
// podFilter is expected to return true for most or all of the pods. We
// can avoid expensive array growth without wasting too much memory by
// pre-allocating capacity.
@ -216,8 +216,8 @@ func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
glog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
currState, ok := cache.podStates[key]
@ -387,8 +387,8 @@ func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
return false, err
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
b, found := cache.assumedPods[key]
if !found {
@ -403,8 +403,8 @@ func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
return nil, err
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
podState, ok := cache.podStates[key]
if !ok {
@ -539,8 +539,8 @@ func (cache *schedulerCache) RemovePDB(pdb *policy.PodDisruptionBudget) error {
}
func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDisruptionBudget, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
var pdbs []*policy.PodDisruptionBudget
for _, pdb := range cache.pdbs {
if selector.Matches(labels.Set(pdb.Labels)) {
@ -551,8 +551,8 @@ func (cache *schedulerCache) ListPDBs(selector labels.Selector) ([]*policy.PodDi
}
func (cache *schedulerCache) IsUpToDate(n *NodeInfo) bool {
cache.mu.Lock()
defer cache.mu.Unlock()
cache.mu.RLock()
defer cache.mu.RUnlock()
node, ok := cache.nodes[n.Node().Name]
return ok && n.generation == node.generation
}

View File

@ -33,25 +33,136 @@ import (
"github.com/golang/glog"
)
// Cache saves and reuses the output of predicate functions. Use RunPredicate to
// get or update the cached results. An appropriate Invalidate* function should
// be called when some predicate results are no longer valid.
// nodeMap stores a *Cache for each node.
type nodeMap map[string]*NodeCache
// Cache is a thread safe map saves and reuses the output of predicate functions,
// it uses node name as key to access those cached results.
//
// Internally, results are keyed by node name, predicate name, and "equivalence
// Internally, results are keyed by predicate name, and "equivalence
// class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called.
type Cache struct {
mu sync.RWMutex
cache nodeMap
// NOTE(harry): Theoretically sync.Map has better performance in machine with 8+ CPUs, while
// the reality is lock contention in first level cache is rare.
mu sync.RWMutex
nodeToCache nodeMap
}
// NewCache returns an empty Cache.
// NewCache create an empty equiv class cache.
func NewCache() *Cache {
return &Cache{
cache: make(nodeMap),
nodeToCache: make(nodeMap),
}
}
// NodeCache saves and reuses the output of predicate functions. Use RunPredicate to
// get or update the cached results. An appropriate Invalidate* function should
// be called when some predicate results are no longer valid.
//
// Internally, results are keyed by predicate name, and "equivalence
// class". (Equivalence class is defined in the `Class` type.) Saved results
// will be reused until an appropriate invalidation function is called.
//
// NodeCache objects are thread safe within the context of NodeCache,
type NodeCache struct {
mu sync.RWMutex
cache predicateMap
}
// newNodeCache returns an empty NodeCache.
func newNodeCache() *NodeCache {
return &NodeCache{
cache: make(predicateMap),
}
}
// GetNodeCache returns the existing NodeCache for given node if present. Otherwise,
// it creates the NodeCache and returns it.
// The boolean flag is true if the value was loaded, false if created.
func (c *Cache) GetNodeCache(name string) (nodeCache *NodeCache, exists bool) {
c.mu.Lock()
defer c.mu.Unlock()
if nodeCache, exists = c.nodeToCache[name]; !exists {
nodeCache = newNodeCache()
c.nodeToCache[name] = nodeCache
}
return
}
// InvalidatePredicates clears all cached results for the given predicates.
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.RLock()
defer c.mu.RUnlock()
for _, n := range c.nodeToCache {
n.invalidatePreds(predicateKeys)
}
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
}
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.RLock()
defer c.mu.RUnlock()
if n, ok := c.nodeToCache[nodeName]; ok {
n.invalidatePreds(predicateKeys)
}
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
}
// InvalidateAllPredicatesOnNode clears all cached results for one node.
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.nodeToCache, nodeName)
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
}
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
// InvalidateCachedPredicateItem for pod add case
// TODO: This does not belong with the equivalence cache implementation.
func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
// will not break the existing inter pod affinity. So we does not need to
// invalidate MatchInterPodAffinity when pod added.
//
// But when a pod is deleted, existing inter pod affinity may become invalid.
// (e.g. this pod was preferred by some else, or vice versa)
//
// NOTE: assumptions above will not stand when we implemented features like
// RequiredDuringSchedulingRequiredDuringExecutioc.
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
// it will also fits to equivalence class of existing pods
// GeneralPredicates: will always be affected by adding a new pod
invalidPredicates := sets.NewString(predicates.GeneralPred)
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decisioc.
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred)
} else {
if vol.AWSElasticBlockStore != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
}
if vol.GCEPersistentDisk != nil {
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
}
if vol.AzureDisk != nil {
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
}
}
}
c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
}
// Class represents a set of pods which are equivalent from the perspective of
// the scheduler. i.e. the scheduler would make the same decision for any pod
// from the same class.
@ -78,9 +189,6 @@ func NewClass(pod *v1.Pod) *Class {
return nil
}
// nodeMap stores PredicateCaches with node name as the key.
type nodeMap map[string]predicateMap
// predicateMap stores resultMaps with predicate name as the key.
type predicateMap map[string]resultMap
@ -97,7 +205,7 @@ type predicateResult struct {
// run and its results cached for the next call.
//
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
func (c *Cache) RunPredicate(
func (n *NodeCache) RunPredicate(
pred algorithm.FitPredicate,
predicateKey string,
pod *v1.Pod,
@ -111,7 +219,7 @@ func (c *Cache) RunPredicate(
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
}
result, ok := c.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash)
result, ok := n.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash)
if ok {
return result.Fit, result.FailReasons, nil
}
@ -120,13 +228,13 @@ func (c *Cache) RunPredicate(
return fit, reasons, err
}
if cache != nil {
c.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
n.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
}
return fit, reasons, nil
}
// updateResult updates the cached result of a predicate.
func (c *Cache) updateResult(
func (n *NodeCache) updateResult(
podName, predicateKey string,
fit bool,
reasons []algorithm.PredicateFailureReason,
@ -134,8 +242,6 @@ func (c *Cache) updateResult(
cache schedulercache.Cache,
nodeInfo *schedulercache.NodeInfo,
) {
c.mu.Lock()
defer c.mu.Unlock()
if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests.
return
@ -144,114 +250,48 @@ func (c *Cache) updateResult(
if !cache.IsUpToDate(nodeInfo) {
return
}
nodeName := nodeInfo.Node().GetName()
if _, exist := c.cache[nodeName]; !exist {
c.cache[nodeName] = make(predicateMap)
}
predicateItem := predicateResult{
Fit: fit,
FailReasons: reasons,
}
// if cached predicate map already exists, just update the predicate by key
if predicates, ok := c.cache[nodeName][predicateKey]; ok {
n.mu.Lock()
defer n.mu.Unlock()
// If cached predicate map already exists, just update the predicate by key
if predicates, ok := n.cache[predicateKey]; ok {
// maps in golang are references, no need to add them back
predicates[equivalenceHash] = predicateItem
} else {
c.cache[nodeName][predicateKey] =
n.cache[predicateKey] =
resultMap{
equivalenceHash: predicateItem,
}
}
glog.V(5).Infof("Cache update: node=%s,predicate=%s,pod=%s,value=%v", nodeName, predicateKey, podName, predicateItem)
glog.V(5).Infof("Cache update: node=%s, predicate=%s,pod=%s,value=%v",
nodeInfo.Node().Name, predicateKey, podName, predicateItem)
}
// lookupResult returns cached predicate results and a bool saying whether a
// cache entry was found.
func (c *Cache) lookupResult(
func (n *NodeCache) lookupResult(
podName, nodeName, predicateKey string,
equivalenceHash uint64,
) (value predicateResult, ok bool) {
c.mu.RLock()
defer c.mu.RUnlock()
glog.V(5).Infof("Cache lookup: node=%s,predicate=%s,pod=%s", nodeName, predicateKey, podName)
value, ok = c.cache[nodeName][predicateKey][equivalenceHash]
n.mu.RLock()
defer n.mu.RUnlock()
value, ok = n.cache[predicateKey][equivalenceHash]
return value, ok
}
// InvalidatePredicates clears all cached results for the given predicates.
func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.Lock()
defer c.mu.Unlock()
// c.cache uses nodeName as key, so we just iterate it and invalid given predicates
for _, predicates := range c.cache {
for predicateKey := range predicateKeys {
delete(predicates, predicateKey)
}
}
glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
}
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
c.mu.Lock()
defer c.mu.Unlock()
// invalidatePreds deletes cached predicates by given keys.
func (n *NodeCache) invalidatePreds(predicateKeys sets.String) {
n.mu.Lock()
defer n.mu.Unlock()
for predicateKey := range predicateKeys {
delete(c.cache[nodeName], predicateKey)
delete(n.cache, predicateKey)
}
glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
}
// InvalidateAllPredicatesOnNode clears all cached results for one node.
func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.cache, nodeName)
glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
}
// InvalidateCachedPredicateItemForPodAdd is a wrapper of
// InvalidateCachedPredicateItem for pod add case
// TODO: This does not belong with the equivalence cache implementation.
func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
// will not break the existing inter pod affinity. So we does not need to
// invalidate MatchInterPodAffinity when pod added.
//
// But when a pod is deleted, existing inter pod affinity may become invalid.
// (e.g. this pod was preferred by some else, or vice versa)
//
// NOTE: assumptions above will not stand when we implemented features like
// RequiredDuringSchedulingRequiredDuringExecution.
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
// it will also fits to equivalence class of existing pods
// GeneralPredicates: will always be affected by adding a new pod
invalidPredicates := sets.NewString(predicates.GeneralPred)
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision.
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred)
} else {
if vol.AWSElasticBlockStore != nil {
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
}
if vol.GCEPersistentDisk != nil {
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
}
if vol.AzureDisk != nil {
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
}
}
}
c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
}
// equivalencePod is the set of pod attributes which must match for two pods to

View File

@ -243,17 +243,21 @@ func TestRunPredicate(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}})
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "n1"}}
node.SetNode(testNode)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}}
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
// Initialize and populate equivalence class cache.
ecache := NewCache()
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
equivClass := NewClass(pod)
if test.expectCacheHit {
ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
nodeCache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
}
fit, reasons, err := ecache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
fit, reasons, err := nodeCache.RunPredicate(test.pred.predicate, "testPredicate", pod, meta, node, equivClass, test.cache)
if err != nil {
if err.Error() != test.expectedError {
@ -284,7 +288,7 @@ func TestRunPredicate(t *testing.T) {
if !test.expectCacheHit && test.pred.callCount == 0 {
t.Errorf("Predicate should be called")
}
_, ok := ecache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
_, ok := nodeCache.lookupResult(pod.Name, node.Node().Name, "testPredicate", equivClass.hash)
if !ok && test.expectCacheWrite {
t.Errorf("Cache write should happen")
}
@ -339,21 +343,25 @@ func TestUpdateResult(t *testing.T) {
},
}
for _, test := range tests {
node := schedulercache.NewNodeInfo()
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
node.SetNode(testNode)
// Initialize and populate equivalence class cache.
ecache := NewCache()
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
if test.expectPredicateMap {
ecache.cache[test.nodeName] = make(predicateMap)
predicateItem := predicateResult{
Fit: true,
}
ecache.cache[test.nodeName][test.predicateKey] =
nodeCache.cache[test.predicateKey] =
resultMap{
test.equivalenceHash: predicateItem,
}
}
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
ecache.updateResult(
nodeCache.updateResult(
test.pod,
test.predicateKey,
test.fit,
@ -363,7 +371,7 @@ func TestUpdateResult(t *testing.T) {
node,
)
cachedMapItem, ok := ecache.cache[test.nodeName][test.predicateKey]
cachedMapItem, ok := nodeCache.cache[test.predicateKey]
if !ok {
t.Errorf("Failed: %s, can't find expected cache item: %v",
test.name, test.expectCacheItem)
@ -473,11 +481,16 @@ func TestLookupResult(t *testing.T) {
}
for _, test := range tests {
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
// Initialize and populate equivalence class cache.
ecache := NewCache()
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
node.SetNode(testNode)
// set cached item to equivalence cache
ecache.updateResult(
nodeCache.updateResult(
test.podName,
test.predicateKey,
test.cachedItem.fit,
@ -493,7 +506,7 @@ func TestLookupResult(t *testing.T) {
ecache.InvalidatePredicatesOnNode(test.nodeName, predicateKeys)
}
// calculate predicate with equivalence cache
result, ok := ecache.lookupResult(test.podName,
result, ok := nodeCache.lookupResult(test.podName,
test.nodeName,
test.predicateKey,
test.equivalenceHashForCalPredicate,
@ -689,9 +702,12 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
for _, test := range tests {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
node.SetNode(testNode)
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
// set cached item to equivalence cache
ecache.updateResult(
nodeCache.updateResult(
test.podName,
testPredicate,
test.cachedItem.fit,
@ -707,8 +723,8 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
// there should be no cached predicate any more
for _, test := range tests {
if algorithmCache, exist := ecache.cache[test.nodeName]; exist {
if _, exist := algorithmCache[testPredicate]; exist {
if nodeCache, exist := ecache.nodeToCache[test.nodeName]; exist {
if _, exist := nodeCache.cache[testPredicate]; exist {
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
testPredicate, test.nodeName)
break
@ -761,9 +777,12 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
for _, test := range tests {
node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}
node.SetNode(testNode)
nodeCache, _ := ecache.GetNodeCache(testNode.Name)
// set cached item to equivalence cache
ecache.updateResult(
nodeCache.updateResult(
test.podName,
testPredicate,
test.cachedItem.fit,
@ -775,10 +794,10 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
}
for _, test := range tests {
// invalidate cached predicate for all nodes
// invalidate all cached predicate for node
ecache.InvalidateAllPredicatesOnNode(test.nodeName)
if _, exist := ecache.cache[test.nodeName]; exist {
t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName)
if _, ok := ecache.GetNodeCache(test.nodeName); ok {
t.Errorf("Failed: node: %v should not be found in internal cache", test.nodeName)
break
}
}

View File

@ -356,20 +356,25 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
var equivClass *equivalence.Class
if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivClass = equivalence.NewClass(pod)
}
checkNode := func(i int) {
var nodeCache *equivalence.NodeCache
nodeName := nodes[i].Name
if g.equivalenceCache != nil {
nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
}
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
g.cache,
g.equivalenceCache,
nodeCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivClass,
@ -472,7 +477,7 @@ func podFitsOnNode(
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
cache schedulercache.Cache,
ecache *equivalence.Cache,
nodeCache *equivalence.NodeCache,
queue SchedulingQueue,
alwaysCheckAllPredicates bool,
equivClass *equivalence.Class,
@ -512,7 +517,7 @@ func podFitsOnNode(
// Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = equivClass != nil && !podsAdded
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
for _, predicateKey := range predicates.Ordering() {
var (
fit bool
@ -522,7 +527,7 @@ func podFitsOnNode(
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable {
fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
} else {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
}

View File

@ -1405,7 +1405,8 @@ func TestCacheInvalidationRace(t *testing.T) {
// Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}})
testNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}}
cache.AddNode(testNode)
mockCache := &syncingMockCache{
Cache: cache,
cycleStart: make(chan struct{}),

View File

@ -772,6 +772,11 @@ func (c *configFactory) addNodeToCache(obj interface{}) {
glog.Errorf("scheduler cache AddNode failed: %v", err)
}
if c.enableEquivalenceClassCache {
// GetNodeCache() will lazily create NodeCache for given node if it does not exist.
c.equivalencePodCache.GetNodeCache(node.GetName())
}
c.podQueue.MoveAllToActiveQueue()
// NOTE: add a new node does not affect existing predicates in equivalence cache
}