Update equivalence cache to use predicate as key

Remove Invalid field from host predicate
This commit is contained in:
Harry Zhang 2017-02-15 16:55:02 +08:00
parent c888069a3b
commit 819554f514
3 changed files with 275 additions and 64 deletions

View File

@ -247,22 +247,21 @@ func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.Strin
// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused.
func GetEquivalencePod(pod *v1.Pod) interface{} {
equivalencePod := EquivalencePod{}
// For now we only consider pods:
// 1. OwnerReferences is Controller
// 2. OwnerReferences kind is in valid controller kinds
// 3. with same OwnerReferences
// 2. with same OwnerReferences
// to be equivalent
if len(pod.OwnerReferences) != 0 {
for _, ref := range pod.OwnerReferences {
if *ref.Controller {
equivalencePod.ControllerRef = ref
// a pod can only belongs to one controller
break
return &EquivalencePod{
ControllerRef: ref,
}
}
}
}
return &equivalencePod
return nil
}
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.

View File

@ -18,18 +18,19 @@ package core
import (
"hash/fnv"
"github.com/golang/groupcache/lru"
"sync"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"github.com/golang/glog"
"github.com/golang/groupcache/lru"
)
// TODO(harryz) figure out the right number for this, 4096 may be too big
const maxCacheEntries = 4096
// we use predicate names as cache's key, its count is limited
const maxCacheEntries = 100
type HostPredicate struct {
Fit bool
@ -41,6 +42,9 @@ type AlgorithmCache struct {
predicatesCache *lru.Cache
}
// PredicateMap use equivalence hash as key
type PredicateMap map[uint64]HostPredicate
func newAlgorithmCache() AlgorithmCache {
return AlgorithmCache{
predicatesCache: lru.New(maxCacheEntries),
@ -61,74 +65,151 @@ func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc)
}
}
// addPodPredicate adds pod predicate for equivalence class
func (ec *EquivalenceCache) addPodPredicate(podKey uint64, nodeName string, fit bool, failReasons []algorithm.PredicateFailureReason) {
// UpdateCachedPredicateItem updates pod predicate for equivalence class
func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64) {
ec.Lock()
defer ec.Unlock()
if _, exist := ec.algorithmCache[nodeName]; !exist {
ec.algorithmCache[nodeName] = newAlgorithmCache()
}
ec.algorithmCache[nodeName].predicatesCache.Add(podKey, HostPredicate{Fit: fit, FailReasons: failReasons})
predicateItem := HostPredicate{
Fit: fit,
FailReasons: reasons,
}
// if cached predicate map already exists, just update the predicate by key
if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok {
predicateMap := v.(PredicateMap)
// maps in golang are references, no need to add them back
predicateMap[equivalenceHash] = predicateItem
} else {
ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey,
PredicateMap{
equivalenceHash: predicateItem,
})
}
glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, pod.GetName(), nodeName, predicateItem)
}
// AddPodPredicatesCache cache pod predicate for equivalence class
func (ec *EquivalenceCache) AddPodPredicatesCache(pod *v1.Pod, fitNodeList []*v1.Node, failedPredicates *FailedPredicateMap) {
equivalenceHash := ec.hashEquivalencePod(pod)
for _, fitNode := range fitNodeList {
ec.addPodPredicate(equivalenceHash, fitNode.Name, true, nil)
}
for failNodeName, failReasons := range *failedPredicates {
ec.addPodPredicate(equivalenceHash, failNodeName, false, failReasons)
}
}
// GetCachedPredicates gets cached predicates for equivalence class
func (ec *EquivalenceCache) GetCachedPredicates(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, []*v1.Node) {
fitNodeList := []*v1.Node{}
failedPredicates := FailedPredicateMap{}
noCacheNodeList := []*v1.Node{}
equivalenceHash := ec.hashEquivalencePod(pod)
for _, node := range nodes {
findCache := false
if algorithmCache, exist := ec.algorithmCache[node.Name]; exist {
if cachePredicate, exist := algorithmCache.predicatesCache.Get(equivalenceHash); exist {
hostPredicate := cachePredicate.(HostPredicate)
// PredicateWithECache returns:
// 1. if fit
// 2. reasons if not fit
// 3. if this cache is invalid
// based on cached predicate results
func (ec *EquivalenceCache) PredicateWithECache(pod *v1.Pod, nodeName, predicateKey string, equivalenceHash uint64) (bool, []algorithm.PredicateFailureReason, bool) {
ec.RLock()
defer ec.RUnlock()
glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, pod.GetName(), nodeName)
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
predicateMap := cachePredicate.(PredicateMap)
// TODO(resouer) Is it possible a race that cache failed to update immediately?
if hostPredicate, ok := predicateMap[equivalenceHash]; ok {
if hostPredicate.Fit {
fitNodeList = append(fitNodeList, node)
return true, []algorithm.PredicateFailureReason{}, false
} else {
failedPredicates[node.Name] = hostPredicate.FailReasons
return false, hostPredicate.FailReasons, false
}
findCache = true
} else {
// is invalid
return false, []algorithm.PredicateFailureReason{}, true
}
}
if !findCache {
noCacheNodeList = append(noCacheNodeList, node)
}
return false, []algorithm.PredicateFailureReason{}, true
}
// InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid
func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
ec.Lock()
defer ec.Unlock()
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
for predicateKey := range predicateKeys {
algorithmCache.predicatesCache.Remove(predicateKey)
}
}
return fitNodeList, failedPredicates, noCacheNodeList
glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName)
}
// SendInvalidAlgorithmCacheReq marks AlgorithmCache item as invalid
func (ec *EquivalenceCache) SendInvalidAlgorithmCacheReq(nodeName string) {
ec.Lock()
defer ec.Unlock()
// clear the cache of this node
delete(ec.algorithmCache, nodeName)
}
// SendClearAllCacheReq marks all cached item as invalid
func (ec *EquivalenceCache) SendClearAllCacheReq() {
ec.Lock()
defer ec.Unlock()
// clear cache of all nodes
for nodeName := range ec.algorithmCache {
delete(ec.algorithmCache, nodeName)
// InvalidateCachedPredicateItemOfAllNodes marks all items of given predicateKeys, of all pods, on all node as invalid
func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
ec.Lock()
defer ec.Unlock()
// algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates
for _, algorithmCache := range ec.algorithmCache {
for predicateKey := range predicateKeys {
// just use keys is enough
algorithmCache.predicatesCache.Remove(predicateKey)
}
}
glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys)
}
// hashEquivalencePod returns the hash of equivalence pod.
func (ec *EquivalenceCache) hashEquivalencePod(pod *v1.Pod) uint64 {
equivalencePod := ec.getEquivalencePod(pod)
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return uint64(hash.Sum32())
// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid
func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) {
ec.Lock()
defer ec.Unlock()
delete(ec.algorithmCache, nodeName)
glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
}
// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod, on the given node as invalid
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) {
if len(predicateKeys) == 0 {
return
}
equivalenceHash := ec.getHashEquivalencePod(pod)
if equivalenceHash == 0 {
// no equivalence pod found, just return
return
}
ec.Lock()
defer ec.Unlock()
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
for predicateKey := range predicateKeys {
if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
// got the cached item of by predicateKey & pod
predicateMap := cachePredicate.(PredicateMap)
delete(predicateMap, equivalenceHash)
}
}
}
glog.V(5).Infof("Done invalidating cached predicates %v on node %s, for pod %v", predicateKeys, nodeName, pod.GetName())
}
// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly binded pod
// will not break the existing inter pod affinity. So we does not need to invalidate
// MatchInterPodAffinity when pod added.
//
// But when a pod is deleted, existing inter pod affinity may become invalid.
// (e.g. this pod was preferred by some else, or vice versa)
//
// NOTE: assumptions above will not stand when we implemented features like
// RequiredDuringSchedulingRequiredDuringExecution.
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
// it will also fits to equivalence class of existing pods
// GeneralPredicates: will always be affected by adding a new pod
invalidPredicates := sets.NewString("GeneralPredicates")
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
}
// getHashEquivalencePod returns the hash of equivalence pod.
// if no equivalence pod found, return 0
func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) uint64 {
equivalencePod := ec.getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return uint64(hash.Sum32())
}
return 0
}

View File

@ -0,0 +1,131 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"reflect"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
)
func TestUpdateCachedPredicateItem(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
predicateKey string
nodeName string
fit bool
reasons []algorithm.PredicateFailureReason
equivalenceHash uint64
expectCacheItem HostPredicate
}{
{
name: "test 1",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
predicateKey: "GeneralPredicates",
nodeName: "node1",
fit: true,
equivalenceHash: 123,
expectCacheItem: HostPredicate{
Fit: true,
},
},
}
for _, test := range tests {
// this case does not need to calculate equivalence hash, just pass an empty function
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.fit, test.reasons, test.equivalenceHash)
value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey)
if !ok {
t.Errorf("Failed : %s, can't find expected cache item: %v", test.name, test.expectCacheItem)
} else {
cachedMapItem := value.(PredicateMap)
if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) {
t.Errorf("Failed : %s, expected cached item: %v, but got: %v", test.name, test.expectCacheItem, cachedMapItem[test.equivalenceHash])
}
}
}
}
type predicateItemType struct {
fit bool
reasons []algorithm.PredicateFailureReason
}
func TestInvalidateCachedPredicateItem(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
nodeName string
predicateKey string
equivalenceHash uint64
cachedItem predicateItemType
expectedInvalid bool
expectedPredicateItem predicateItemType
}{
{
name: "test 1",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
nodeName: "node1",
equivalenceHash: 123,
predicateKey: "GeneralPredicates",
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
expectedInvalid: true,
expectedPredicateItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{},
},
},
}
for _, test := range tests {
// this case does not need to calculate equivalence hash, just pass an empty function
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
// set cached item to equivalence cache
ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHash)
// if we want to do invalid, invalid the cached item
if test.expectedInvalid {
predicateKeys := sets.NewString()
predicateKeys.Insert(test.predicateKey)
ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys)
}
// calculate predicate with equivalence cache
fit, reasons, invalid := ecache.PredicateWithECache(test.pod, test.nodeName, test.predicateKey, test.equivalenceHash)
// returned invalid should match expectedInvalid
if invalid != test.expectedInvalid {
t.Errorf("Failed : %s, expected invalid: %v, but got: %v", test.name, test.expectedInvalid, invalid)
}
// returned predicate result should match expected predicate item
if fit != test.expectedPredicateItem.fit {
t.Errorf("Failed : %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit)
}
if !reflect.DeepEqual(reasons, test.expectedPredicateItem.reasons) {
t.Errorf("Failed : %s, expected reasons: %v, but got: %v", test.name, test.cachedItem.reasons, reasons)
}
}
}