diff --git a/plugin/pkg/scheduler/algorithm/predicates/utils.go b/plugin/pkg/scheduler/algorithm/predicates/utils.go index 9a96425d964..84d096d23aa 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/utils.go +++ b/plugin/pkg/scheduler/algorithm/predicates/utils.go @@ -18,6 +18,7 @@ package predicates import ( "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" ) @@ -64,3 +65,27 @@ func CreateSelectorFromLabels(aL map[string]string) labels.Selector { } return labels.Set(aL).AsSelector() } + +// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused. +func GetEquivalencePod(pod *v1.Pod) interface{} { + // For now we only consider pods: + // 1. OwnerReferences is Controller + // 2. with same OwnerReferences + // to be equivalent + if len(pod.OwnerReferences) != 0 { + for _, ref := range pod.OwnerReferences { + if *ref.Controller { + // a pod can only belongs to one controller + return &EquivalencePod{ + ControllerRef: ref, + } + } + } + } + return nil +} + +// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods. +type EquivalencePod struct { + ControllerRef metav1.OwnerReference +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/BUILD b/plugin/pkg/scheduler/algorithmprovider/defaults/BUILD index a3f35290ccf..93a44c9e3d4 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/BUILD +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/BUILD @@ -20,8 +20,6 @@ go_library( "//plugin/pkg/scheduler/core:go_default_library", "//plugin/pkg/scheduler/factory:go_default_library", "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", ], ) diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index b9e75dccda5..46fa0e60bfb 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -20,8 +20,6 @@ import ( "os" "strconv" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" @@ -89,7 +87,7 @@ func init() { factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodMatchNodeSelector) // Use equivalence class to speed up predicates & priorities - factory.RegisterGetEquivalencePodFunction(GetEquivalencePod) + factory.RegisterGetEquivalencePodFunction(predicates.GetEquivalencePod) // ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing // the number of pods (belonging to the same service) on the same node. @@ -252,27 +250,3 @@ func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.Strin } return result } - -// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused. -func GetEquivalencePod(pod *v1.Pod) interface{} { - // For now we only consider pods: - // 1. OwnerReferences is Controller - // 2. with same OwnerReferences - // to be equivalent - if len(pod.OwnerReferences) != 0 { - for _, ref := range pod.OwnerReferences { - if *ref.Controller { - // a pod can only belongs to one controller - return &EquivalencePod{ - ControllerRef: ref, - } - } - } - } - return nil -} - -// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods. -type EquivalencePod struct { - ControllerRef metav1.OwnerReference -} diff --git a/plugin/pkg/scheduler/core/equivalence_cache.go b/plugin/pkg/scheduler/core/equivalence_cache.go index 7b192502b23..28d12614b1f 100644 --- a/plugin/pkg/scheduler/core/equivalence_cache.go +++ b/plugin/pkg/scheduler/core/equivalence_cache.go @@ -66,7 +66,7 @@ func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) } // UpdateCachedPredicateItem updates pod predicate for equivalence class -func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64) { +func (ec *EquivalenceCache) UpdateCachedPredicateItem(podName, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64) { ec.Lock() defer ec.Unlock() if _, exist := ec.algorithmCache[nodeName]; !exist { @@ -87,7 +87,7 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, pre equivalenceHash: predicateItem, }) } - glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, pod.GetName(), nodeName, predicateItem) + glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, podName, nodeName, predicateItem) } // PredicateWithECache returns: @@ -95,10 +95,10 @@ func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, pre // 2. reasons if not fit // 3. if this cache is invalid // based on cached predicate results -func (ec *EquivalenceCache) PredicateWithECache(pod *v1.Pod, nodeName, predicateKey string, equivalenceHash uint64) (bool, []algorithm.PredicateFailureReason, bool) { +func (ec *EquivalenceCache) PredicateWithECache(podName, nodeName, predicateKey string, equivalenceHash uint64) (bool, []algorithm.PredicateFailureReason, bool) { ec.RLock() defer ec.RUnlock() - glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, pod.GetName(), nodeName) + glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, podName, nodeName) if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist { predicateMap := cachePredicate.(PredicateMap) @@ -158,31 +158,6 @@ func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName stri glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) } -// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod (i.e. equivalenceHash), -// on the given node as invalid -func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) { - if len(predicateKeys) == 0 { - return - } - equivalenceHash := ec.getHashEquivalencePod(pod) - if equivalenceHash == 0 { - // no equivalence pod found, just return - return - } - ec.Lock() - defer ec.Unlock() - if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { - for predicateKey := range predicateKeys { - if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist { - // got the cached item of by predicateKey & pod - predicateMap := cachePredicate.(PredicateMap) - delete(predicateMap, equivalenceHash) - } - } - } - glog.V(5).Infof("Done invalidating cached predicates %v on node %s, for pod %v", predicateKeys, nodeName, pod.GetName()) -} - // InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { // MatchInterPodAffinity: we assume scheduler can make sure newly binded pod diff --git a/plugin/pkg/scheduler/core/equivalence_cache_test.go b/plugin/pkg/scheduler/core/equivalence_cache_test.go index 5f58a798119..5c069a89e1a 100644 --- a/plugin/pkg/scheduler/core/equivalence_cache_test.go +++ b/plugin/pkg/scheduler/core/equivalence_cache_test.go @@ -27,10 +27,15 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" ) +type predicateItemType struct { + fit bool + reasons []algorithm.PredicateFailureReason +} + func TestUpdateCachedPredicateItem(t *testing.T) { tests := []struct { name string - pod *v1.Pod + pod string predicateKey string nodeName string fit bool @@ -41,7 +46,7 @@ func TestUpdateCachedPredicateItem(t *testing.T) { }{ { name: "test 1", - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + pod: "testPod", predicateKey: "GeneralPredicates", nodeName: "node1", fit: true, @@ -53,7 +58,7 @@ func TestUpdateCachedPredicateItem(t *testing.T) { }, { name: "test 2", - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + pod: "testPod", predicateKey: "GeneralPredicates", nodeName: "node2", fit: false, @@ -78,29 +83,33 @@ func TestUpdateCachedPredicateItem(t *testing.T) { test.equivalenceHash: predicateItem, }) } - ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.fit, test.reasons, test.equivalenceHash) + ecache.UpdateCachedPredicateItem( + test.pod, + test.nodeName, + test.predicateKey, + test.fit, + test.reasons, + test.equivalenceHash, + ) value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey) if !ok { - t.Errorf("Failed : %s, can't find expected cache item: %v", test.name, test.expectCacheItem) + t.Errorf("Failed: %s, can't find expected cache item: %v", + test.name, test.expectCacheItem) } else { cachedMapItem := value.(PredicateMap) if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) { - t.Errorf("Failed : %s, expected cached item: %v, but got: %v", test.name, test.expectCacheItem, cachedMapItem[test.equivalenceHash]) + t.Errorf("Failed: %s, expected cached item: %v, but got: %v", + test.name, test.expectCacheItem, cachedMapItem[test.equivalenceHash]) } } } } -type predicateItemType struct { - fit bool - reasons []algorithm.PredicateFailureReason -} - -func TestCachedPredicateItem(t *testing.T) { +func TestPredicateWithECache(t *testing.T) { tests := []struct { name string - pod *v1.Pod + podName string nodeName string predicateKey string equivalenceHashForUpdatePredicate uint64 @@ -112,7 +121,7 @@ func TestCachedPredicateItem(t *testing.T) { }{ { name: "test 1", - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + podName: "testPod", nodeName: "node1", equivalenceHashForUpdatePredicate: 123, equivalenceHashForCalPredicate: 123, @@ -129,7 +138,7 @@ func TestCachedPredicateItem(t *testing.T) { }, { name: "test 2", - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + podName: "testPod", nodeName: "node2", equivalenceHashForUpdatePredicate: 123, equivalenceHashForCalPredicate: 123, @@ -145,7 +154,7 @@ func TestCachedPredicateItem(t *testing.T) { }, { name: "test 3", - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + podName: "testPod", nodeName: "node3", equivalenceHashForUpdatePredicate: 123, equivalenceHashForCalPredicate: 123, @@ -162,7 +171,7 @@ func TestCachedPredicateItem(t *testing.T) { }, { name: "test 4", - pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + podName: "testPod", nodeName: "node4", equivalenceHashForUpdatePredicate: 123, equivalenceHashForCalPredicate: 456, @@ -185,7 +194,14 @@ func TestCachedPredicateItem(t *testing.T) { fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil } ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc) // set cached item to equivalence cache - ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHashForUpdatePredicate) + ecache.UpdateCachedPredicateItem( + test.podName, + test.nodeName, + test.predicateKey, + test.cachedItem.fit, + test.cachedItem.reasons, + test.equivalenceHashForUpdatePredicate, + ) // if we want to do invalid, invalid the cached item if test.expectedInvalidPredicateKey { predicateKeys := sets.NewString() @@ -193,23 +209,241 @@ func TestCachedPredicateItem(t *testing.T) { ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) } // calculate predicate with equivalence cache - fit, reasons, invalid := ecache.PredicateWithECache(test.pod, test.nodeName, test.predicateKey, test.equivalenceHashForCalPredicate) + fit, reasons, invalid := ecache.PredicateWithECache(test.podName, + test.nodeName, + test.predicateKey, + test.equivalenceHashForCalPredicate, + ) // returned invalid should match expectedInvalidPredicateKey or expectedInvalidEquivalenceHash if test.equivalenceHashForUpdatePredicate != test.equivalenceHashForCalPredicate { if invalid != test.expectedInvalidEquivalenceHash { - t.Errorf("Failed : %s when using invalid equivalenceHash, expected invalid: %v, but got: %v", test.name, test.expectedInvalidEquivalenceHash, invalid) + t.Errorf("Failed: %s, expected invalid: %v, but got: %v", + test.name, test.expectedInvalidEquivalenceHash, invalid) } } else { if invalid != test.expectedInvalidPredicateKey { - t.Errorf("Failed : %s, expected invalid: %v, but got: %v", test.name, test.expectedInvalidPredicateKey, invalid) + t.Errorf("Failed: %s, expected invalid: %v, but got: %v", + test.name, test.expectedInvalidPredicateKey, invalid) } } // returned predicate result should match expected predicate item if fit != test.expectedPredicateItem.fit { - t.Errorf("Failed : %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit) + t.Errorf("Failed: %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit) } if !reflect.DeepEqual(reasons, test.expectedPredicateItem.reasons) { - t.Errorf("Failed : %s, expected reasons: %v, but got: %v", test.name, test.cachedItem.reasons, reasons) + t.Errorf("Failed: %s, expected reasons: %v, but got: %v", + test.name, test.cachedItem.reasons, reasons) + } + } +} + +func TestGetHashEquivalencePod(t *testing.T) { + // use default equivalence class calculator + ecache := NewEquivalenceCache(predicates.GetEquivalencePod) + + isController := true + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "ReplicationController", + Name: "rc", + UID: "123", + Controller: &isController, + }, + }, + }, + } + + pod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "ReplicationController", + Name: "rc", + UID: "123", + Controller: &isController, + }, + }, + }, + } + + pod3 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "ReplicationController", + Name: "rc", + UID: "567", + Controller: &isController, + }, + }, + }, + } + + hash1 := ecache.getHashEquivalencePod(pod1) + hash2 := ecache.getHashEquivalencePod(pod2) + hash3 := ecache.getHashEquivalencePod(pod3) + + if hash1 != hash2 { + t.Errorf("Failed: pod %v and %v is expected to be equivalent", pod1.Name, pod2.Name) + } + + if hash2 == hash3 { + t.Errorf("Failed: pod %v and %v is not expected to be equivalent", pod2.Name, pod3.Name) + } + + // pod4 is a pod without controller ref + pod4 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod4", + }, + } + hash4 := ecache.getHashEquivalencePod(pod4) + + if hash4 != 0 { + t.Errorf("Failed: equivalence hash of pod %v is expected to be: 0, but got: %v", + pod4.Name, hash4) + } +} + +func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) { + testPredicate := "GeneralPredicates" + // tests is used to initialize all nodes + tests := []struct { + podName string + nodeName string + predicateKey string + equivalenceHashForUpdatePredicate uint64 + cachedItem predicateItemType + }{ + { + podName: "testPod", + nodeName: "node1", + equivalenceHashForUpdatePredicate: 123, + cachedItem: predicateItemType{ + fit: false, + reasons: []algorithm.PredicateFailureReason{ + predicates.ErrPodNotFitsHostPorts, + }, + }, + }, + { + podName: "testPod", + nodeName: "node2", + equivalenceHashForUpdatePredicate: 456, + cachedItem: predicateItemType{ + fit: false, + reasons: []algorithm.PredicateFailureReason{ + predicates.ErrPodNotFitsHostPorts, + }, + }, + }, + { + podName: "testPod", + nodeName: "node3", + equivalenceHashForUpdatePredicate: 123, + cachedItem: predicateItemType{ + fit: true, + }, + }, + } + // this case does not need to calculate equivalence hash, just pass an empty function + fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil } + ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc) + + for _, test := range tests { + // set cached item to equivalence cache + ecache.UpdateCachedPredicateItem( + test.podName, + test.nodeName, + testPredicate, + test.cachedItem.fit, + test.cachedItem.reasons, + test.equivalenceHashForUpdatePredicate, + ) + } + + // invalidate cached predicate for all nodes + ecache.InvalidateCachedPredicateItemOfAllNodes(sets.NewString(testPredicate)) + + // there should be no cached predicate any more + for _, test := range tests { + if algorithmCache, exist := ecache.algorithmCache[test.nodeName]; exist { + if _, exist := algorithmCache.predicatesCache.Get(testPredicate); exist { + t.Errorf("Failed: cached item for predicate key: %v on node: %v should be invalidated", + testPredicate, test.nodeName) + break + } + } + } +} + +func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) { + testPredicate := "GeneralPredicates" + // tests is used to initialize all nodes + tests := []struct { + podName string + nodeName string + predicateKey string + equivalenceHashForUpdatePredicate uint64 + cachedItem predicateItemType + }{ + { + podName: "testPod", + nodeName: "node1", + equivalenceHashForUpdatePredicate: 123, + cachedItem: predicateItemType{ + fit: false, + reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, + }, + }, + { + podName: "testPod", + nodeName: "node2", + equivalenceHashForUpdatePredicate: 456, + cachedItem: predicateItemType{ + fit: false, + reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, + }, + }, + { + podName: "testPod", + nodeName: "node3", + equivalenceHashForUpdatePredicate: 123, + cachedItem: predicateItemType{ + fit: true, + }, + }, + } + // this case does not need to calculate equivalence hash, just pass an empty function + fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil } + ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc) + + for _, test := range tests { + // set cached item to equivalence cache + ecache.UpdateCachedPredicateItem( + test.podName, + test.nodeName, + testPredicate, + test.cachedItem.fit, + test.cachedItem.reasons, + test.equivalenceHashForUpdatePredicate, + ) + } + + for _, test := range tests { + // invalidate cached predicate for all nodes + ecache.InvalidateAllCachedPredicateItemOfNode(test.nodeName) + if _, exist := ecache.algorithmCache[test.nodeName]; exist { + t.Errorf("Failed: cached item for node: %v should be invalidated", test.nodeName) + break } } } diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index 68ab9b389eb..9e882ef9ec5 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -251,7 +251,7 @@ func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, // If equivalenceCache is available if eCacheAvailable { // PredicateWithECache will returns it's cached predicate results - fit, reasons, invalid = ecache.PredicateWithECache(pod, info.Node().GetName(), predicateKey, equivalenceHash) + fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash) } if !eCacheAvailable || invalid { @@ -264,7 +264,7 @@ func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, if eCacheAvailable { // update equivalence cache with newly computed fit & reasons // TODO(resouer) should we do this in another thread? any race? - ecache.UpdateCachedPredicateItem(pod, info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash) + ecache.UpdateCachedPredicateItem(pod.GetName(), info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash) } }