Merge pull request #64236 from misterikkit/ecache-move

Automatic merge from submit-queue (batch tested with PRs 65256, 64236, 64919, 64879, 57932). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Move equivalence cache to its own package

**What this PR does / why we need it**:
Untangle equivalence class implementation from genericScheduler implementation. This makes it clearer where the two interact, and prevents them from accessing unexported internals.



**Special notes for your reviewer**:
Please review https://github.com/kubernetes/kubernetes/pull/63942 first as this is based on it.
This also needs to rebase on https://github.com/kubernetes/kubernetes/pull/64216 to pass tests.
**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-06-20 17:22:12 -07:00 committed by GitHub
commit 14268aef75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 255 additions and 203 deletions

View File

@ -46,6 +46,7 @@ go_library(
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",

View File

@ -9,7 +9,6 @@ load(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"equivalence_cache_test.go",
"extender_test.go", "extender_test.go",
"generic_scheduler_test.go", "generic_scheduler_test.go",
"scheduling_queue_test.go", "scheduling_queue_test.go",
@ -22,6 +21,7 @@ go_test(
"//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/testing:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//vendor/k8s.io/api/apps/v1beta1:go_default_library", "//vendor/k8s.io/api/apps/v1beta1:go_default_library",
@ -38,7 +38,6 @@ go_test(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"equivalence_cache.go",
"extender.go", "extender.go",
"generic_scheduler.go", "generic_scheduler.go",
"scheduling_queue.go", "scheduling_queue.go",
@ -51,10 +50,10 @@ go_library(
"//pkg/scheduler/algorithm/priorities/util:go_default_library", "//pkg/scheduler/algorithm/priorities/util:go_default_library",
"//pkg/scheduler/api:go_default_library", "//pkg/scheduler/api:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/metrics:go_default_library", "//pkg/scheduler/metrics:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",
"//pkg/util/hash:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/api/policy/v1beta1:go_default_library",
@ -80,6 +79,9 @@ filegroup(
filegroup( filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [":package-srcs"], srcs = [
":package-srcs",
"//pkg/scheduler/core/equivalence:all-srcs",
],
tags = ["automanaged"], tags = ["automanaged"],
) )

View File

@ -0,0 +1,47 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["eqivalence.go"],
importpath = "k8s.io/kubernetes/pkg/scheduler/core/equivalence",
visibility = ["//visibility:public"],
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/util/hash:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["eqivalence_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/testing:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package core // Package equivalence defines Pod equivalence classes and the equivalence class
// cache.
package equivalence
import ( import (
"fmt" "fmt"
@ -31,19 +33,51 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// EquivalenceCache saves and reuses the output of predicate functions. Use // Cache saves and reuses the output of predicate functions. Use RunPredicate to
// RunPredicate to get or update the cached results. An appropriate Invalidate* // get or update the cached results. An appropriate Invalidate* function should
// function should be called when some predicate results are no longer valid. // be called when some predicate results are no longer valid.
// //
// Internally, results are keyed by node name, predicate name, and "equivalence // Internally, results are keyed by node name, predicate name, and "equivalence
// class". (Equivalence class is defined in the `EquivalenceClassInfo` type.) // class". (Equivalence class is defined in the `Class` type.) Saved results
// Saved results will be reused until an appropriate invalidation function is // will be reused until an appropriate invalidation function is called.
// called. type Cache struct {
type EquivalenceCache struct {
mu sync.RWMutex mu sync.RWMutex
cache nodeMap cache nodeMap
} }
// NewCache returns an empty Cache.
func NewCache() *Cache {
return &Cache{
cache: make(nodeMap),
}
}
// Class represents a set of pods which are equivalent from the perspective of
// the scheduler. i.e. the scheduler would make the same decision for any pod
// from the same class.
type Class struct {
// Equivalence hash
hash uint64
}
// NewClass returns the equivalence class for a given Pod. The returned Class
// objects will be equal for two Pods in the same class. nil values should not
// be considered equal to each other.
//
// NOTE: Make sure to compare types of Class and not *Class.
// TODO(misterikkit): Return error instead of nil *Class.
func NewClass(pod *v1.Pod) *Class {
equivalencePod := getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return &Class{
hash: uint64(hash.Sum32()),
}
}
return nil
}
// nodeMap stores PredicateCaches with node name as the key. // nodeMap stores PredicateCaches with node name as the key.
type nodeMap map[string]predicateMap type nodeMap map[string]predicateMap
@ -59,25 +93,17 @@ type predicateResult struct {
FailReasons []algorithm.PredicateFailureReason FailReasons []algorithm.PredicateFailureReason
} }
// NewEquivalenceCache returns EquivalenceCache to speed up predicates by caching
// result from previous scheduling.
func NewEquivalenceCache() *EquivalenceCache {
return &EquivalenceCache{
cache: make(nodeMap),
}
}
// RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be // RunPredicate returns a cached predicate result. In case of a cache miss, the predicate will be
// run and its results cached for the next call. // run and its results cached for the next call.
// //
// NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale. // NOTE: RunPredicate will not update the equivalence cache if the given NodeInfo is stale.
func (ec *EquivalenceCache) RunPredicate( func (c *Cache) RunPredicate(
pred algorithm.FitPredicate, pred algorithm.FitPredicate,
predicateKey string, predicateKey string,
pod *v1.Pod, pod *v1.Pod,
meta algorithm.PredicateMetadata, meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo, nodeInfo *schedulercache.NodeInfo,
equivClassInfo *EquivalenceClassInfo, equivClass *Class,
cache schedulercache.Cache, cache schedulercache.Cache,
) (bool, []algorithm.PredicateFailureReason, error) { ) (bool, []algorithm.PredicateFailureReason, error) {
if nodeInfo == nil || nodeInfo.Node() == nil { if nodeInfo == nil || nodeInfo.Node() == nil {
@ -85,7 +111,7 @@ func (ec *EquivalenceCache) RunPredicate(
return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid") return false, []algorithm.PredicateFailureReason{}, fmt.Errorf("nodeInfo is nil or node is invalid")
} }
result, ok := ec.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClassInfo.hash) result, ok := c.lookupResult(pod.GetName(), nodeInfo.Node().GetName(), predicateKey, equivClass.hash)
if ok { if ok {
return result.Fit, result.FailReasons, nil return result.Fit, result.FailReasons, nil
} }
@ -94,13 +120,13 @@ func (ec *EquivalenceCache) RunPredicate(
return fit, reasons, err return fit, reasons, err
} }
if cache != nil { if cache != nil {
ec.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClassInfo.hash, cache, nodeInfo) c.updateResult(pod.GetName(), predicateKey, fit, reasons, equivClass.hash, cache, nodeInfo)
} }
return fit, reasons, nil return fit, reasons, nil
} }
// updateResult updates the cached result of a predicate. // updateResult updates the cached result of a predicate.
func (ec *EquivalenceCache) updateResult( func (c *Cache) updateResult(
podName, predicateKey string, podName, predicateKey string,
fit bool, fit bool,
reasons []algorithm.PredicateFailureReason, reasons []algorithm.PredicateFailureReason,
@ -108,8 +134,8 @@ func (ec *EquivalenceCache) updateResult(
cache schedulercache.Cache, cache schedulercache.Cache,
nodeInfo *schedulercache.NodeInfo, nodeInfo *schedulercache.NodeInfo,
) { ) {
ec.mu.Lock() c.mu.Lock()
defer ec.mu.Unlock() defer c.mu.Unlock()
if nodeInfo == nil || nodeInfo.Node() == nil { if nodeInfo == nil || nodeInfo.Node() == nil {
// This may happen during tests. // This may happen during tests.
return return
@ -119,81 +145,80 @@ func (ec *EquivalenceCache) updateResult(
return return
} }
nodeName := nodeInfo.Node().GetName() nodeName := nodeInfo.Node().GetName()
if _, exist := ec.cache[nodeName]; !exist { if _, exist := c.cache[nodeName]; !exist {
ec.cache[nodeName] = make(predicateMap) c.cache[nodeName] = make(predicateMap)
} }
predicateItem := predicateResult{ predicateItem := predicateResult{
Fit: fit, Fit: fit,
FailReasons: reasons, FailReasons: reasons,
} }
// if cached predicate map already exists, just update the predicate by key // if cached predicate map already exists, just update the predicate by key
if predicates, ok := ec.cache[nodeName][predicateKey]; ok { if predicates, ok := c.cache[nodeName][predicateKey]; ok {
// maps in golang are references, no need to add them back // maps in golang are references, no need to add them back
predicates[equivalenceHash] = predicateItem predicates[equivalenceHash] = predicateItem
} else { } else {
ec.cache[nodeName][predicateKey] = c.cache[nodeName][predicateKey] =
resultMap{ resultMap{
equivalenceHash: predicateItem, equivalenceHash: predicateItem,
} }
} }
glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem) glog.V(5).Infof("Cache update: node=%s,predicate=%s,pod=%s,value=%v", nodeName, predicateKey, podName, predicateItem)
} }
// lookupResult returns cached predicate results and a bool saying whether a // lookupResult returns cached predicate results and a bool saying whether a
// cache entry was found. // cache entry was found.
func (ec *EquivalenceCache) lookupResult( func (c *Cache) lookupResult(
podName, nodeName, predicateKey string, podName, nodeName, predicateKey string,
equivalenceHash uint64, equivalenceHash uint64,
) (value predicateResult, ok bool) { ) (value predicateResult, ok bool) {
ec.mu.RLock() c.mu.RLock()
defer ec.mu.RUnlock() defer c.mu.RUnlock()
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", glog.V(5).Infof("Cache lookup: node=%s,predicate=%s,pod=%s", nodeName, predicateKey, podName)
predicateKey, podName, nodeName) value, ok = c.cache[nodeName][predicateKey][equivalenceHash]
value, ok = ec.cache[nodeName][predicateKey][equivalenceHash]
return value, ok return value, ok
} }
// InvalidatePredicates clears all cached results for the given predicates. // InvalidatePredicates clears all cached results for the given predicates.
func (ec *EquivalenceCache) InvalidatePredicates(predicateKeys sets.String) { func (c *Cache) InvalidatePredicates(predicateKeys sets.String) {
if len(predicateKeys) == 0 { if len(predicateKeys) == 0 {
return return
} }
ec.mu.Lock() c.mu.Lock()
defer ec.mu.Unlock() defer c.mu.Unlock()
// ec.cache uses nodeName as key, so we just iterate it and invalid given predicates // c.cache uses nodeName as key, so we just iterate it and invalid given predicates
for _, predicates := range ec.cache { for _, predicates := range c.cache {
for predicateKey := range predicateKeys { for predicateKey := range predicateKeys {
delete(predicates, predicateKey) delete(predicates, predicateKey)
} }
} }
glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys) glog.V(5).Infof("Cache invalidation: node=*,predicates=%v", predicateKeys)
} }
// InvalidatePredicatesOnNode clears cached results for the given predicates on one node. // InvalidatePredicatesOnNode clears cached results for the given predicates on one node.
func (ec *EquivalenceCache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) { func (c *Cache) InvalidatePredicatesOnNode(nodeName string, predicateKeys sets.String) {
if len(predicateKeys) == 0 { if len(predicateKeys) == 0 {
return return
} }
ec.mu.Lock() c.mu.Lock()
defer ec.mu.Unlock() defer c.mu.Unlock()
for predicateKey := range predicateKeys { for predicateKey := range predicateKeys {
delete(ec.cache[nodeName], predicateKey) delete(c.cache[nodeName], predicateKey)
} }
glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) glog.V(5).Infof("Cache invalidation: node=%s,predicates=%v", nodeName, predicateKeys)
} }
// InvalidateAllPredicatesOnNode clears all cached results for one node. // InvalidateAllPredicatesOnNode clears all cached results for one node.
func (ec *EquivalenceCache) InvalidateAllPredicatesOnNode(nodeName string) { func (c *Cache) InvalidateAllPredicatesOnNode(nodeName string) {
ec.mu.Lock() c.mu.Lock()
defer ec.mu.Unlock() defer c.mu.Unlock()
delete(ec.cache, nodeName) delete(c.cache, nodeName)
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) glog.V(5).Infof("Cache invalidation: node=%s,predicates=*", nodeName)
} }
// InvalidateCachedPredicateItemForPodAdd is a wrapper of // InvalidateCachedPredicateItemForPodAdd is a wrapper of
// InvalidateCachedPredicateItem for pod add case // InvalidateCachedPredicateItem for pod add case
// TODO: This logic does not belong with the equivalence cache implementation. // TODO: This does not belong with the equivalence cache implementation.
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { func (c *Cache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly bound pod // MatchInterPodAffinity: we assume scheduler can make sure newly bound pod
// will not break the existing inter pod affinity. So we does not need to // will not break the existing inter pod affinity. So we does not need to
// invalidate MatchInterPodAffinity when pod added. // invalidate MatchInterPodAffinity when pod added.
@ -226,30 +251,7 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod,
} }
} }
} }
ec.InvalidatePredicatesOnNode(nodeName, invalidPredicates) c.InvalidatePredicatesOnNode(nodeName, invalidPredicates)
}
// EquivalenceClassInfo holds equivalence hash which is used for checking
// equivalence cache. We will pass this to podFitsOnNode to ensure equivalence
// hash is only calculated per schedule.
type EquivalenceClassInfo struct {
// Equivalence hash.
hash uint64
}
// GetEquivalenceClassInfo returns a hash of the given pod. The hashing function
// returns the same value for any two pods that are equivalent from the
// perspective of scheduling.
func (ec *EquivalenceCache) GetEquivalenceClassInfo(pod *v1.Pod) *EquivalenceClassInfo {
equivalencePod := getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return &EquivalenceClassInfo{
hash: uint64(hash.Sum32()),
}
}
return nil
} }
// equivalencePod is the set of pod attributes which must match for two pods to // equivalencePod is the set of pod attributes which must match for two pods to

View File

@ -14,20 +14,17 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package core package equivalence
import ( import (
"errors" "errors"
"reflect" "reflect"
"sync"
"testing" "testing"
"time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/algorithm" "k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
@ -250,8 +247,8 @@ func TestRunPredicate(t *testing.T) {
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}} pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "p1"}}
meta := algorithm.EmptyPredicateMetadataProducer(nil, nil) meta := algorithm.EmptyPredicateMetadataProducer(nil, nil)
ecache := NewEquivalenceCache() ecache := NewCache()
equivClass := ecache.GetEquivalenceClassInfo(pod) equivClass := NewClass(pod)
if test.expectCacheHit { if test.expectCacheHit {
ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node) ecache.updateResult(pod.Name, "testPredicate", test.expectFit, test.expectedReasons, equivClass.hash, test.cache, node)
} }
@ -342,7 +339,7 @@ func TestUpdateResult(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
ecache := NewEquivalenceCache() ecache := NewCache()
if test.expectPredicateMap { if test.expectPredicateMap {
ecache.cache[test.nodeName] = make(predicateMap) ecache.cache[test.nodeName] = make(predicateMap)
predicateItem := predicateResult{ predicateItem := predicateResult{
@ -476,7 +473,7 @@ func TestLookupResult(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
ecache := NewEquivalenceCache() ecache := NewCache()
node := schedulercache.NewNodeInfo() node := schedulercache.NewNodeInfo()
node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}}) node.SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: test.nodeName}})
// set cached item to equivalence cache // set cached item to equivalence cache
@ -530,9 +527,6 @@ func TestLookupResult(t *testing.T) {
} }
func TestGetEquivalenceHash(t *testing.T) { func TestGetEquivalenceHash(t *testing.T) {
ecache := NewEquivalenceCache()
pod1 := makeBasicPod("pod1") pod1 := makeBasicPod("pod1")
pod2 := makeBasicPod("pod2") pod2 := makeBasicPod("pod2")
@ -623,7 +617,7 @@ func TestGetEquivalenceHash(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
for i, podInfo := range test.podInfoList { for i, podInfo := range test.podInfoList {
testPod := podInfo.pod testPod := podInfo.pod
eclassInfo := ecache.GetEquivalenceClassInfo(testPod) eclassInfo := NewClass(testPod)
if eclassInfo == nil && podInfo.hashIsValid { if eclassInfo == nil && podInfo.hashIsValid {
t.Errorf("Failed: pod %v is expected to have valid hash", testPod) t.Errorf("Failed: pod %v is expected to have valid hash", testPod)
} }
@ -691,7 +685,7 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
cache: &upToDateCache{}, cache: &upToDateCache{},
}, },
} }
ecache := NewEquivalenceCache() ecache := NewCache()
for _, test := range tests { for _, test := range tests {
node := schedulercache.NewNodeInfo() node := schedulercache.NewNodeInfo()
@ -763,7 +757,7 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
cache: &upToDateCache{}, cache: &upToDateCache{},
}, },
} }
ecache := NewEquivalenceCache() ecache := NewCache()
for _, test := range tests { for _, test := range tests {
node := schedulercache.NewNodeInfo() node := schedulercache.NewNodeInfo()
@ -796,100 +790,3 @@ func BenchmarkEquivalenceHash(b *testing.B) {
getEquivalencePod(pod) getEquivalencePod(pod)
} }
} }
// syncingMockCache delegates method calls to an actual Cache,
// but calls to UpdateNodeNameToInfoMap synchronize with the test.
type syncingMockCache struct {
schedulercache.Cache
cycleStart, cacheInvalidated chan struct{}
once sync.Once
}
// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it
// synchronizes with the test.
//
// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use
// this point to signal to the test that a scheduling cycle has started.
func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
err := c.Cache.UpdateNodeNameToInfoMap(infoMap)
c.once.Do(func() {
c.cycleStart <- struct{}{}
<-c.cacheInvalidated
})
return err
}
// TestEquivalenceCacheInvalidationRace tests that equivalence cache invalidation is correctly
// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event
// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired.
func TestEquivalenceCacheInvalidationRace(t *testing.T) {
// Create a predicate that returns false the first time and true on subsequent calls.
podWillFit := false
var callCount int
testPredicate := func(pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
callCount++
if !podWillFit {
podWillFit = true
return false, []algorithm.PredicateFailureReason{predicates.ErrFakePredicate}, nil
}
return true, nil, nil
}
// Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}})
mockCache := &syncingMockCache{
Cache: cache,
cycleStart: make(chan struct{}),
cacheInvalidated: make(chan struct{}),
}
eCache := NewEquivalenceCache()
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated.
go func() {
<-mockCache.cycleStart
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
Spec: v1.PodSpec{NodeName: "machine1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err)
}
eCache.InvalidateAllPredicatesOnNode("machine1")
mockCache.cacheInvalidated <- struct{}{}
}()
// Set up the scheduler.
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
predicates.SetPredicatesOrdering([]string{"testPredicate"})
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
scheduler := NewGenericScheduler(
mockCache,
eCache,
NewSchedulingQueue(),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
machine, err := scheduler.Schedule(pod, nodeLister)
if machine != "" || err == nil {
t.Error("First scheduling attempt did not fail")
}
// Second scheduling attempt should succeed because cache was invalidated.
_, err = scheduler.Schedule(pod, nodeLister)
if err != nil {
t.Errorf("Second scheduling attempt failed: %v", err)
}
if callCount != 2 {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
}
}

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates" "k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder" "k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -85,7 +86,7 @@ func (f *FitError) Error() string {
type genericScheduler struct { type genericScheduler struct {
cache schedulercache.Cache cache schedulercache.Cache
equivalenceCache *EquivalenceCache equivalenceCache *equivalence.Cache
schedulingQueue SchedulingQueue schedulingQueue SchedulingQueue
predicates map[string]algorithm.FitPredicate predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.PriorityMetadataProducer priorityMetaProducer algorithm.PriorityMetadataProducer
@ -342,10 +343,10 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
// We can use the same metadata producer for all nodes. // We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap) meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
var equivCacheInfo *EquivalenceClassInfo var equivClass *equivalence.Class
if g.equivalenceCache != nil { if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found // getEquivalenceClassInfo will return immediately if no equivalence pod found
equivCacheInfo = g.equivalenceCache.GetEquivalenceClassInfo(pod) equivClass = equivalence.NewClass(pod)
} }
checkNode := func(i int) { checkNode := func(i int) {
@ -359,7 +360,7 @@ func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v
g.equivalenceCache, g.equivalenceCache,
g.schedulingQueue, g.schedulingQueue,
g.alwaysCheckAllPredicates, g.alwaysCheckAllPredicates,
equivCacheInfo, equivClass,
) )
if err != nil { if err != nil {
predicateResultLock.Lock() predicateResultLock.Lock()
@ -459,10 +460,10 @@ func podFitsOnNode(
info *schedulercache.NodeInfo, info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate, predicateFuncs map[string]algorithm.FitPredicate,
cache schedulercache.Cache, cache schedulercache.Cache,
ecache *EquivalenceCache, ecache *equivalence.Cache,
queue SchedulingQueue, queue SchedulingQueue,
alwaysCheckAllPredicates bool, alwaysCheckAllPredicates bool,
equivCacheInfo *EquivalenceClassInfo, equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) { ) (bool, []algorithm.PredicateFailureReason, error) {
var ( var (
eCacheAvailable bool eCacheAvailable bool
@ -499,7 +500,7 @@ func podFitsOnNode(
// Bypass eCache if node has any nominated pods. // Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change. // when pods are nominated or their nominations change.
eCacheAvailable = equivCacheInfo != nil && !podsAdded eCacheAvailable = equivClass != nil && !podsAdded
for _, predicateKey := range predicates.Ordering() { for _, predicateKey := range predicates.Ordering() {
var ( var (
fit bool fit bool
@ -509,7 +510,7 @@ func podFitsOnNode(
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist { if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable { if eCacheAvailable {
fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivCacheInfo, cache) fit, reasons, err = ecache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
} else { } else {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
} }
@ -1056,7 +1057,7 @@ func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeCla
// NewGenericScheduler creates a genericScheduler object. // NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler( func NewGenericScheduler(
cache schedulercache.Cache, cache schedulercache.Cache,
eCache *EquivalenceCache, eCache *equivalence.Cache,
podQueue SchedulingQueue, podQueue SchedulingQueue,
predicates map[string]algorithm.FitPredicate, predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.PredicateMetadataProducer, predicateMetaProducer algorithm.PredicateMetadataProducer,

View File

@ -22,6 +22,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -39,6 +40,7 @@ import (
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing"
) )
@ -1342,3 +1344,100 @@ func TestPreempt(t *testing.T) {
close(stop) close(stop)
} }
} }
// syncingMockCache delegates method calls to an actual Cache,
// but calls to UpdateNodeNameToInfoMap synchronize with the test.
type syncingMockCache struct {
schedulercache.Cache
cycleStart, cacheInvalidated chan struct{}
once sync.Once
}
// UpdateNodeNameToInfoMap delegates to the real implementation, but on the first call, it
// synchronizes with the test.
//
// Since UpdateNodeNameToInfoMap is one of the first steps of (*genericScheduler).Schedule, we use
// this point to signal to the test that a scheduling cycle has started.
func (c *syncingMockCache) UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error {
err := c.Cache.UpdateNodeNameToInfoMap(infoMap)
c.once.Do(func() {
c.cycleStart <- struct{}{}
<-c.cacheInvalidated
})
return err
}
// TestCacheInvalidationRace tests that equivalence cache invalidation is correctly
// handled when an invalidation event happens early in a scheduling cycle. Specifically, the event
// occurs after schedulercache is snapshotted and before equivalence cache lock is acquired.
func TestCacheInvalidationRace(t *testing.T) {
// Create a predicate that returns false the first time and true on subsequent calls.
podWillFit := false
var callCount int
testPredicate := func(pod *v1.Pod,
meta algorithm.PredicateMetadata,
nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
callCount++
if !podWillFit {
podWillFit = true
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
return true, nil, nil
}
// Set up the mock cache.
cache := schedulercache.New(time.Duration(0), wait.NeverStop)
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1"}})
mockCache := &syncingMockCache{
Cache: cache,
cycleStart: make(chan struct{}),
cacheInvalidated: make(chan struct{}),
}
eCache := equivalence.NewCache()
// Ensure that equivalence cache invalidation happens after the scheduling cycle starts, but before
// the equivalence cache would be updated.
go func() {
<-mockCache.cycleStart
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "new-pod", UID: "new-pod"},
Spec: v1.PodSpec{NodeName: "machine1"}}
if err := cache.AddPod(pod); err != nil {
t.Errorf("Could not add pod to cache: %v", err)
}
eCache.InvalidateAllPredicatesOnNode("machine1")
mockCache.cacheInvalidated <- struct{}{}
}()
// Set up the scheduler.
ps := map[string]algorithm.FitPredicate{"testPredicate": testPredicate}
algorithmpredicates.SetPredicatesOrdering([]string{"testPredicate"})
prioritizers := []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}}
pvcLister := schedulertesting.FakePersistentVolumeClaimLister([]*v1.PersistentVolumeClaim{})
scheduler := NewGenericScheduler(
mockCache,
eCache,
NewSchedulingQueue(),
ps,
algorithm.EmptyPredicateMetadataProducer,
prioritizers,
algorithm.EmptyPriorityMetadataProducer,
nil, nil, pvcLister, true, false)
// First scheduling attempt should fail.
nodeLister := schedulertesting.FakeNodeLister(makeNodeList([]string{"machine1"}))
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}}
machine, err := scheduler.Schedule(pod, nodeLister)
if machine != "" || err == nil {
t.Error("First scheduling attempt did not fail")
}
// Second scheduling attempt should succeed because cache was invalidated.
_, err = scheduler.Schedule(pod, nodeLister)
if err != nil {
t.Errorf("Second scheduling attempt failed: %v", err)
}
if callCount != 2 {
t.Errorf("Predicate should have been called twice. Was called %d times.", callCount)
}
}

View File

@ -62,6 +62,7 @@ go_library(
"//pkg/scheduler/api/validation:go_default_library", "//pkg/scheduler/api/validation:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//pkg/scheduler/core:go_default_library", "//pkg/scheduler/core:go_default_library",
"//pkg/scheduler/core/equivalence:go_default_library",
"//pkg/scheduler/util:go_default_library", "//pkg/scheduler/util:go_default_library",
"//pkg/scheduler/volumebinder:go_default_library", "//pkg/scheduler/volumebinder:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",

View File

@ -61,6 +61,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/api/validation" "k8s.io/kubernetes/pkg/scheduler/api/validation"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder" "k8s.io/kubernetes/pkg/scheduler/volumebinder"
) )
@ -123,7 +124,7 @@ type configFactory struct {
hardPodAffinitySymmetricWeight int32 hardPodAffinitySymmetricWeight int32
// Equivalence class cache // Equivalence class cache
equivalencePodCache *core.EquivalenceCache equivalencePodCache *equivalence.Cache
// Enable equivalence class cache // Enable equivalence class cache
enableEquivalenceClassCache bool enableEquivalenceClassCache bool
@ -1074,7 +1075,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
// Init equivalence class cache // Init equivalence class cache
if c.enableEquivalenceClassCache { if c.enableEquivalenceClassCache {
c.equivalencePodCache = core.NewEquivalenceCache() c.equivalencePodCache = equivalence.NewCache()
glog.Info("Created equivalence class cache") glog.Info("Created equivalence class cache")
} }

View File

@ -34,6 +34,7 @@ import (
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"k8s.io/kubernetes/pkg/scheduler/core" "k8s.io/kubernetes/pkg/scheduler/core"
"k8s.io/kubernetes/pkg/scheduler/core/equivalence"
"k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/volumebinder" "k8s.io/kubernetes/pkg/scheduler/volumebinder"
@ -104,7 +105,7 @@ type Config struct {
SchedulerCache schedulercache.Cache SchedulerCache schedulercache.Cache
// Ecache is used for optimistically invalid affected cache items after // Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod // successfully binding a pod
Ecache *core.EquivalenceCache Ecache *equivalence.Cache
NodeLister algorithm.NodeLister NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder GetBinder func(pod *v1.Pod) Binder