Merge pull request #49863 from resouer/eclass-test

Automatic merge from submit-queue (batch tested with PRs 46685, 49863, 50098, 50070, 50096)

Cover equivalence cache tests in scheduler core

**What this PR does / why we need it**:

Finish the last part of equivalence class scheduling #17390 : complete unit tests.

Also, removed `InvalidateCachedPredicateItemForPod` which is unused in latest design.

```
godep go test -v equivalence_cache_test.go  equivalence_cache.go  -cover

PASS
coverage: 92.3% of statements
ok  	command-line-arguments	0.071s
```

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-04 00:20:52 -07:00 committed by GitHub
commit 6065a0daab
6 changed files with 289 additions and 83 deletions

View File

@ -18,6 +18,7 @@ package predicates
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
@ -64,3 +65,27 @@ func CreateSelectorFromLabels(aL map[string]string) labels.Selector {
}
return labels.Set(aL).AsSelector()
}
// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused.
func GetEquivalencePod(pod *v1.Pod) interface{} {
// For now we only consider pods:
// 1. OwnerReferences is Controller
// 2. with same OwnerReferences
// to be equivalent
if len(pod.OwnerReferences) != 0 {
for _, ref := range pod.OwnerReferences {
if *ref.Controller {
// a pod can only belongs to one controller
return &EquivalencePod{
ControllerRef: ref,
}
}
}
}
return nil
}
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
type EquivalencePod struct {
ControllerRef metav1.OwnerReference
}

View File

@ -20,8 +20,6 @@ go_library(
"//plugin/pkg/scheduler/core:go_default_library",
"//plugin/pkg/scheduler/factory:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)

View File

@ -20,8 +20,6 @@ import (
"os"
"strconv"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -89,7 +87,7 @@ func init() {
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodMatchNodeSelector)
// Use equivalence class to speed up predicates & priorities
factory.RegisterGetEquivalencePodFunction(GetEquivalencePod)
factory.RegisterGetEquivalencePodFunction(predicates.GetEquivalencePod)
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
// the number of pods (belonging to the same service) on the same node.
@ -252,27 +250,3 @@ func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.Strin
}
return result
}
// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused.
func GetEquivalencePod(pod *v1.Pod) interface{} {
// For now we only consider pods:
// 1. OwnerReferences is Controller
// 2. with same OwnerReferences
// to be equivalent
if len(pod.OwnerReferences) != 0 {
for _, ref := range pod.OwnerReferences {
if *ref.Controller {
// a pod can only belongs to one controller
return &EquivalencePod{
ControllerRef: ref,
}
}
}
}
return nil
}
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
type EquivalencePod struct {
ControllerRef metav1.OwnerReference
}

View File

@ -66,7 +66,7 @@ func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc)
}
// UpdateCachedPredicateItem updates pod predicate for equivalence class
func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64) {
func (ec *EquivalenceCache) UpdateCachedPredicateItem(podName, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64) {
ec.Lock()
defer ec.Unlock()
if _, exist := ec.algorithmCache[nodeName]; !exist {
@ -87,7 +87,7 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, pre
equivalenceHash: predicateItem,
})
}
glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, pod.GetName(), nodeName, predicateItem)
glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem)
}
// PredicateWithECache returns:
@ -95,10 +95,10 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, pre
// 2. reasons if not fit
// 3. if this cache is invalid
// based on cached predicate results
func (ec *EquivalenceCache) PredicateWithECache(pod *v1.Pod, nodeName, predicateKey string, equivalenceHash uint64) (bool, []algorithm.PredicateFailureReason, bool) {
func (ec *EquivalenceCache) PredicateWithECache(podName, nodeName, predicateKey string, equivalenceHash uint64) (bool, []algorithm.PredicateFailureReason, bool) {
ec.RLock()
defer ec.RUnlock()
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, pod.GetName(), nodeName)
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, podName, nodeName)
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
predicateMap := cachePredicate.(PredicateMap)
@ -158,31 +158,6 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
}
// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod (i.e. equivalenceHash),
// on the given node as invalid
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) {
if len(predicateKeys) == 0 {
return
}
equivalenceHash := ec.getHashEquivalencePod(pod)
if equivalenceHash == 0 {
// no equivalence pod found, just return
return
}
ec.Lock()
defer ec.Unlock()
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
for predicateKey := range predicateKeys {
if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
// got the cached item of by predicateKey & pod
predicateMap := cachePredicate.(PredicateMap)
delete(predicateMap, equivalenceHash)
}
}
}
glog.V(5).Infof("Done invalidating cached predicates %v on node %s, for pod %v", predicateKeys, nodeName, pod.GetName())
}
// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly binded pod

View File

@ -27,10 +27,15 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
)
type predicateItemType struct {
fit bool
reasons []algorithm.PredicateFailureReason
}
func TestUpdateCachedPredicateItem(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
pod string
predicateKey string
nodeName string
fit bool
@ -41,7 +46,7 @@ func TestUpdateCachedPredicateItem(t *testing.T) {
}{
{
name: "test 1",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
pod: "testPod",
predicateKey: "GeneralPredicates",
nodeName: "node1",
fit: true,
@ -53,7 +58,7 @@ func TestUpdateCachedPredicateItem(t *testing.T) {
},
{
name: "test 2",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
pod: "testPod",
predicateKey: "GeneralPredicates",
nodeName: "node2",
fit: false,
@ -78,29 +83,33 @@ func TestUpdateCachedPredicateItem(t *testing.T) {
test.equivalenceHash: predicateItem,
})
}
ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.fit, test.reasons, test.equivalenceHash)
ecache.UpdateCachedPredicateItem(
test.pod,
test.nodeName,
test.predicateKey,
test.fit,
test.reasons,
test.equivalenceHash,
)
value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey)
if !ok {
t.Errorf("Failed : %s, can't find expected cache item: %v", test.name, test.expectCacheItem)
t.Errorf("Failed: %s, can't find expected cache item: %v",
test.name, test.expectCacheItem)
} else {
cachedMapItem := value.(PredicateMap)
if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) {
t.Errorf("Failed : %s, expected cached item: %v, but got: %v", test.name, test.expectCacheItem, cachedMapItem[test.equivalenceHash])
t.Errorf("Failed: %s, expected cached item: %v, but got: %v",
test.name, test.expectCacheItem, cachedMapItem[test.equivalenceHash])
}
}
}
}
type predicateItemType struct {
fit bool
reasons []algorithm.PredicateFailureReason
}
func TestCachedPredicateItem(t *testing.T) {
func TestPredicateWithECache(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
podName string
nodeName string
predicateKey string
equivalenceHashForUpdatePredicate uint64
@ -112,7 +121,7 @@ func TestCachedPredicateItem(t *testing.T) {
}{
{
name: "test 1",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
podName: "testPod",
nodeName: "node1",
equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123,
@ -129,7 +138,7 @@ func TestCachedPredicateItem(t *testing.T) {
},
{
name: "test 2",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
podName: "testPod",
nodeName: "node2",
equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123,
@ -145,7 +154,7 @@ func TestCachedPredicateItem(t *testing.T) {
},
{
name: "test 3",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
podName: "testPod",
nodeName: "node3",
equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 123,
@ -162,7 +171,7 @@ func TestCachedPredicateItem(t *testing.T) {
},
{
name: "test 4",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
podName: "testPod",
nodeName: "node4",
equivalenceHashForUpdatePredicate: 123,
equivalenceHashForCalPredicate: 456,
@ -185,7 +194,14 @@ func TestCachedPredicateItem(t *testing.T) {
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
// set cached item to equivalence cache
ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate)
ecache.UpdateCachedPredicateItem(
test.podName,
test.nodeName,
test.predicateKey,
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
)
// if we want to do invalid, invalid the cached item
if test.expectedInvalidPredicateKey {
predicateKeys := sets.NewString()
@ -193,23 +209,241 @@ func TestCachedPredicateItem(t *testing.T) {
ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys)
}
// calculate predicate with equivalence cache
fit, reasons, invalid := ecache.PredicateWithECache(test.pod, test.nodeName, test.predicateKey, test.equivalenceHashForCalPredicate)
fit, reasons, invalid := ecache.PredicateWithECache(test.podName,
test.nodeName,
test.predicateKey,
test.equivalenceHashForCalPredicate,
)
// returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash
if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate {
if invalid != test.expectedInvalidEquivalenceHash {
t.Errorf("Failed : %s when using invalid equivalenceHash, expected invalid: %v, but got: %v", test.name, test.expectedInvalidEquivalenceHash, invalid)
t.Errorf("Failed: %s, expected invalid: %v, but got: %v",
test.name, test.expectedInvalidEquivalenceHash, invalid)
}
} else {
if invalid != test.expectedInvalidPredicateKey {
t.Errorf("Failed : %s, expected invalid: %v, but got: %v", test.name, test.expectedInvalidPredicateKey, invalid)
t.Errorf("Failed: %s, expected invalid: %v, but got: %v",
test.name, test.expectedInvalidPredicateKey, invalid)
}
}
// returned predicate result should match expected predicate item
if fit != test.expectedPredicateItem.fit {
t.Errorf("Failed : %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit)
t.Errorf("Failed: %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit)
}
if !reflect.DeepEqual(reasons, test.expectedPredicateItem.reasons) {
t.Errorf("Failed : %s, expected reasons: %v, but got: %v", test.name, test.cachedItem.reasons, reasons)
t.Errorf("Failed: %s, expected reasons: %v, but got: %v",
test.name, test.cachedItem.reasons, reasons)
}
}
}
func TestGetHashEquivalencePod(t *testing.T) {
// use default equivalence class calculator
ecache := NewEquivalenceCache(predicates.GetEquivalencePod)
isController := true
pod1 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "ReplicationController",
Name: "rc",
UID: "123",
Controller: &isController,
},
},
},
}
pod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "ReplicationController",
Name: "rc",
UID: "123",
Controller: &isController,
},
},
},
}
pod3 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod3",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "ReplicationController",
Name: "rc",
UID: "567",
Controller: &isController,
},
},
},
}
hash1 := ecache.getHashEquivalencePod(pod1)
hash2 := ecache.getHashEquivalencePod(pod2)
hash3 := ecache.getHashEquivalencePod(pod3)
if hash1 != hash2 {
t.Errorf("Failed: pod %v and %v is expected to be equivalent", pod1.Name, pod2.Name)
}
if hash2 == hash3 {
t.Errorf("Failed: pod %v and %v is not expected to be equivalent", pod2.Name, pod3.Name)
}
// pod4 is a pod without controller ref
pod4 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod4",
},
}
hash4 := ecache.getHashEquivalencePod(pod4)
if hash4 != 0 {
t.Errorf("Failed: equivalence hash of pod %v is expected to be: 0, but got: %v",
pod4.Name, hash4)
}
}
func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
testPredicate := "GeneralPredicates"
// tests is used to initialize all nodes
tests := []struct {
podName string
nodeName string
predicateKey string
equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType
}{
{
podName: "testPod",
nodeName: "node1",
equivalenceHashForUpdatePredicate: 123,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{
predicates.ErrPodNotFitsHostPorts,
},
},
},
{
podName: "testPod",
nodeName: "node2",
equivalenceHashForUpdatePredicate: 456,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{
predicates.ErrPodNotFitsHostPorts,
},
},
},
{
podName: "testPod",
nodeName: "node3",
equivalenceHashForUpdatePredicate: 123,
cachedItem: predicateItemType{
fit: true,
},
},
}
// this case does not need to calculate equivalence hash, just pass an empty function
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
for _, test := range tests {
// set cached item to equivalence cache
ecache.UpdateCachedPredicateItem(
test.podName,
test.nodeName,
testPredicate,
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
)
}
// invalidate cached predicate for all nodes
ecache.InvalidateCachedPredicateItemOfAllNodes(sets.NewString(testPredicate))
// there should be no cached predicate any more
for _, test := range tests {
if algorithmCache, exist := ecache.algorithmCache[test.nodeName]; exist {
if _, exist := algorithmCache.predicatesCache.Get(testPredicate); exist {
t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated",
testPredicate, test.nodeName)
break
}
}
}
}
func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
testPredicate := "GeneralPredicates"
// tests is used to initialize all nodes
tests := []struct {
podName string
nodeName string
predicateKey string
equivalenceHashForUpdatePredicate uint64
cachedItem predicateItemType
}{
{
podName: "testPod",
nodeName: "node1",
equivalenceHashForUpdatePredicate: 123,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
},
{
podName: "testPod",
nodeName: "node2",
equivalenceHashForUpdatePredicate: 456,
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
},
{
podName: "testPod",
nodeName: "node3",
equivalenceHashForUpdatePredicate: 123,
cachedItem: predicateItemType{
fit: true,
},
},
}
// this case does not need to calculate equivalence hash, just pass an empty function
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
for _, test := range tests {
// set cached item to equivalence cache
ecache.UpdateCachedPredicateItem(
test.podName,
test.nodeName,
testPredicate,
test.cachedItem.fit,
test.cachedItem.reasons,
test.equivalenceHashForUpdatePredicate,
)
}
for _, test := range tests {
// invalidate cached predicate for all nodes
ecache.InvalidateAllCachedPredicateItemOfNode(test.nodeName)
if _, exist := ecache.algorithmCache[test.nodeName]; exist {
t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName)
break
}
}
}

View File

@ -251,7 +251,7 @@ func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo,
// If equivalenceCache is available
if eCacheAvailable {
// PredicateWithECache will returns it's cached predicate results
fit, reasons, invalid = ecache.PredicateWithECache(pod, info.Node().GetName(), predicateKey, equivalenceHash)
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}
if !eCacheAvailable || invalid {
@ -264,7 +264,7 @@ func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo,
if eCacheAvailable {
// update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod, info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash)
ecache.UpdateCachedPredicateItem(pod.GetName(), info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash)
}
}