Merge pull request #41541 from resouer/eclass-algorithm

Automatic merge from submit-queue

Update algorithm of equivalence class cache predicates

NOTE: This is the first two commits of #36238

**What's in this PR:**

1. Definition of equivalence class
2. An update of `equivalence_cache.go` algorithms to implement enable/disable equivalence cache for individual predicate
3. Added equivalence class data structure to `Generic Scheduler` but did not initialize it. This is used to show how we will use equivalence class when scheduling.

**Why I did this:**

Although #36238 has been finished for a period of time, we found it's still very hard to review, because it mixed 1) definition of equivalence class, 2) how to use equivalence cache, and 3) how to keep this cache up-to-date, 4) e2e tests to verify (3) works.  

So reviewers are easily distracted by different technical points like `hash algorithms`, `how to properly use Informer` etc, left the more important equivalence algorithms untouched.

So this PR I only includes 1) and 2), leaves updating this cache in #36238. I can see this part is totally independent from the rest part of it. So we can definitely review this equivalence  strategies first.

cc @kubernetes/sig-scheduling-pr-reviews @davidopp @jayunit100 @wojtek-t
This commit is contained in:
Kubernetes Submit Queue 2017-03-28 06:26:39 -07:00 committed by GitHub
commit 6d7824d8a6
11 changed files with 344 additions and 86 deletions

View File

@ -43,6 +43,7 @@ go_library(
"//pkg/client/listers/core/v1:go_default_library", "//pkg/client/listers/core/v1:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/api:go_default_library", "//plugin/pkg/scheduler/api:go_default_library",
"//plugin/pkg/scheduler/core:go_default_library",
"//plugin/pkg/scheduler/metrics:go_default_library", "//plugin/pkg/scheduler/metrics:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library",
"//plugin/pkg/scheduler/util:go_default_library", "//plugin/pkg/scheduler/util:go_default_library",

View File

@ -247,22 +247,21 @@ func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.Strin
// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused. // GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused.
func GetEquivalencePod(pod *v1.Pod) interface{} { func GetEquivalencePod(pod *v1.Pod) interface{} {
equivalencePod := EquivalencePod{}
// For now we only consider pods: // For now we only consider pods:
// 1. OwnerReferences is Controller // 1. OwnerReferences is Controller
// 2. OwnerReferences kind is in valid controller kinds // 2. with same OwnerReferences
// 3. with same OwnerReferences
// to be equivalent // to be equivalent
if len(pod.OwnerReferences) != 0 { if len(pod.OwnerReferences) != 0 {
for _, ref := range pod.OwnerReferences { for _, ref := range pod.OwnerReferences {
if *ref.Controller { if *ref.Controller {
equivalencePod.ControllerRef = ref
// a pod can only belongs to one controller // a pod can only belongs to one controller
break return &EquivalencePod{
ControllerRef: ref,
}
} }
} }
} }
return &equivalencePod return nil
} }
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods. // EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.

View File

@ -11,6 +11,7 @@ load(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"equivalence_cache_test.go",
"extender_test.go", "extender_test.go",
"generic_scheduler_test.go", "generic_scheduler_test.go",
], ],
@ -53,6 +54,7 @@ go_library(
"//vendor:github.com/golang/groupcache/lru", "//vendor:github.com/golang/groupcache/lru",
"//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/net", "//vendor:k8s.io/apimachinery/pkg/util/net",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apiserver/pkg/util/trace", "//vendor:k8s.io/apiserver/pkg/util/trace",
"//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/util/workqueue", "//vendor:k8s.io/client-go/util/workqueue",

View File

@ -18,18 +18,19 @@ package core
import ( import (
"hash/fnv" "hash/fnv"
"github.com/golang/groupcache/lru"
"sync" "sync"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
hashutil "k8s.io/kubernetes/pkg/util/hash" hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"github.com/golang/glog"
"github.com/golang/groupcache/lru"
) )
// TODO(harryz) figure out the right number for this, 4096 may be too big // we use predicate names as cache's key, its count is limited
const maxCacheEntries = 4096 const maxCacheEntries = 100
type HostPredicate struct { type HostPredicate struct {
Fit bool Fit bool
@ -41,6 +42,9 @@ type AlgorithmCache struct {
predicatesCache *lru.Cache predicatesCache *lru.Cache
} }
// PredicateMap use equivalence hash as key
type PredicateMap map[uint64]HostPredicate
func newAlgorithmCache() AlgorithmCache { func newAlgorithmCache() AlgorithmCache {
return AlgorithmCache{ return AlgorithmCache{
predicatesCache: lru.New(maxCacheEntries), predicatesCache: lru.New(maxCacheEntries),
@ -61,74 +65,151 @@ func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc)
} }
} }
// addPodPredicate adds pod predicate for equivalence class // UpdateCachedPredicateItem updates pod predicate for equivalence class
func (ec *EquivalenceCache) addPodPredicate(podKey uint64, nodeName string, fit bool, failReasons []algorithm.PredicateFailureReason) { func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64) {
ec.Lock()
defer ec.Unlock()
if _, exist := ec.algorithmCache[nodeName]; !exist { if _, exist := ec.algorithmCache[nodeName]; !exist {
ec.algorithmCache[nodeName] = newAlgorithmCache() ec.algorithmCache[nodeName] = newAlgorithmCache()
} }
ec.algorithmCache[nodeName].predicatesCache.Add(podKey, HostPredicate{Fit: fit, FailReasons: failReasons}) predicateItem := HostPredicate{
Fit: fit,
FailReasons: reasons,
}
// if cached predicate map already exists, just update the predicate by key
if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok {
predicateMap := v.(PredicateMap)
// maps in golang are references, no need to add them back
predicateMap[equivalenceHash] = predicateItem
} else {
ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey,
PredicateMap{
equivalenceHash: predicateItem,
})
}
glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, pod.GetName(), nodeName, predicateItem)
} }
// AddPodPredicatesCache cache pod predicate for equivalence class // PredicateWithECache returns:
func (ec *EquivalenceCache) AddPodPredicatesCache(pod *v1.Pod, fitNodeList []*v1.Node, failedPredicates *FailedPredicateMap) { // 1. if fit
equivalenceHash := ec.hashEquivalencePod(pod) // 2. reasons if not fit
// 3. if this cache is invalid
for _, fitNode := range fitNodeList { // based on cached predicate results
ec.addPodPredicate(equivalenceHash, fitNode.Name, true, nil) func (ec *EquivalenceCache) PredicateWithECache(pod *v1.Pod, nodeName, predicateKey string, equivalenceHash uint64) (bool, []algorithm.PredicateFailureReason, bool) {
} ec.RLock()
for failNodeName, failReasons := range *failedPredicates { defer ec.RUnlock()
ec.addPodPredicate(equivalenceHash, failNodeName, false, failReasons) glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, pod.GetName(), nodeName)
} if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
} if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
predicateMap := cachePredicate.(PredicateMap)
// GetCachedPredicates gets cached predicates for equivalence class // TODO(resouer) Is it possible a race that cache failed to update immediately?
func (ec *EquivalenceCache) GetCachedPredicates(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, []*v1.Node) { if hostPredicate, ok := predicateMap[equivalenceHash]; ok {
fitNodeList := []*v1.Node{}
failedPredicates := FailedPredicateMap{}
noCacheNodeList := []*v1.Node{}
equivalenceHash := ec.hashEquivalencePod(pod)
for _, node := range nodes {
findCache := false
if algorithmCache, exist := ec.algorithmCache[node.Name]; exist {
if cachePredicate, exist := algorithmCache.predicatesCache.Get(equivalenceHash); exist {
hostPredicate := cachePredicate.(HostPredicate)
if hostPredicate.Fit { if hostPredicate.Fit {
fitNodeList = append(fitNodeList, node) return true, []algorithm.PredicateFailureReason{}, false
} else { } else {
failedPredicates[node.Name] = hostPredicate.FailReasons return false, hostPredicate.FailReasons, false
} }
findCache = true } else {
// is invalid
return false, []algorithm.PredicateFailureReason{}, true
} }
} }
if !findCache { }
noCacheNodeList = append(noCacheNodeList, node) return false, []algorithm.PredicateFailureReason{}, true
}
// InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid
func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) {
if len(predicateKeys) == 0 {
return
}
ec.Lock()
defer ec.Unlock()
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
for predicateKey := range predicateKeys {
algorithmCache.predicatesCache.Remove(predicateKey)
} }
} }
return fitNodeList, failedPredicates, noCacheNodeList glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName)
} }
// SendInvalidAlgorithmCacheReq marks AlgorithmCache item as invalid // InvalidateCachedPredicateItemOfAllNodes marks all items of given predicateKeys, of all pods, on all node as invalid
func (ec *EquivalenceCache) SendInvalidAlgorithmCacheReq(nodeName string) { func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) {
ec.Lock() if len(predicateKeys) == 0 {
defer ec.Unlock() return
// clear the cache of this node
delete(ec.algorithmCache, nodeName)
}
// SendClearAllCacheReq marks all cached item as invalid
func (ec *EquivalenceCache) SendClearAllCacheReq() {
ec.Lock()
defer ec.Unlock()
// clear cache of all nodes
for nodeName := range ec.algorithmCache {
delete(ec.algorithmCache, nodeName)
} }
ec.Lock()
defer ec.Unlock()
// algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates
for _, algorithmCache := range ec.algorithmCache {
for predicateKey := range predicateKeys {
// just use keys is enough
algorithmCache.predicatesCache.Remove(predicateKey)
}
}
glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys)
} }
// hashEquivalencePod returns the hash of equivalence pod. // InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid
func (ec *EquivalenceCache) hashEquivalencePod(pod *v1.Pod) uint64 { func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) {
equivalencePod := ec.getEquivalencePod(pod) ec.Lock()
hash := fnv.New32a() defer ec.Unlock()
hashutil.DeepHashObject(hash, equivalencePod) delete(ec.algorithmCache, nodeName)
return uint64(hash.Sum32()) glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
}
// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod, on the given node as invalid
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) {
if len(predicateKeys) == 0 {
return
}
equivalenceHash := ec.getHashEquivalencePod(pod)
if equivalenceHash == 0 {
// no equivalence pod found, just return
return
}
ec.Lock()
defer ec.Unlock()
if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
for predicateKey := range predicateKeys {
if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
// got the cached item of by predicateKey & pod
predicateMap := cachePredicate.(PredicateMap)
delete(predicateMap, equivalenceHash)
}
}
}
glog.V(5).Infof("Done invalidating cached predicates %v on node %s, for pod %v", predicateKeys, nodeName, pod.GetName())
}
// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case
func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
// MatchInterPodAffinity: we assume scheduler can make sure newly binded pod
// will not break the existing inter pod affinity. So we does not need to invalidate
// MatchInterPodAffinity when pod added.
//
// But when a pod is deleted, existing inter pod affinity may become invalid.
// (e.g. this pod was preferred by some else, or vice versa)
//
// NOTE: assumptions above will not stand when we implemented features like
// RequiredDuringSchedulingRequiredDuringExecution.
// NoDiskConflict: the newly scheduled pod fits to existing pods on this node,
// it will also fits to equivalence class of existing pods
// GeneralPredicates: will always be affected by adding a new pod
invalidPredicates := sets.NewString("GeneralPredicates")
ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
}
// getHashEquivalencePod returns the hash of equivalence pod.
// if no equivalence pod found, return 0
func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) uint64 {
equivalencePod := ec.getEquivalencePod(pod)
if equivalencePod != nil {
hash := fnv.New32a()
hashutil.DeepHashObject(hash, equivalencePod)
return uint64(hash.Sum32())
}
return 0
} }

View File

@ -0,0 +1,131 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package core
import (
"reflect"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
)
func TestUpdateCachedPredicateItem(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
predicateKey string
nodeName string
fit bool
reasons []algorithm.PredicateFailureReason
equivalenceHash uint64
expectCacheItem HostPredicate
}{
{
name: "test 1",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
predicateKey: "GeneralPredicates",
nodeName: "node1",
fit: true,
equivalenceHash: 123,
expectCacheItem: HostPredicate{
Fit: true,
},
},
}
for _, test := range tests {
// this case does not need to calculate equivalence hash, just pass an empty function
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.fit, test.reasons, test.equivalenceHash)
value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey)
if !ok {
t.Errorf("Failed : %s, can't find expected cache item: %v", test.name, test.expectCacheItem)
} else {
cachedMapItem := value.(PredicateMap)
if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) {
t.Errorf("Failed : %s, expected cached item: %v, but got: %v", test.name, test.expectCacheItem, cachedMapItem[test.equivalenceHash])
}
}
}
}
type predicateItemType struct {
fit bool
reasons []algorithm.PredicateFailureReason
}
func TestInvalidateCachedPredicateItem(t *testing.T) {
tests := []struct {
name string
pod *v1.Pod
nodeName string
predicateKey string
equivalenceHash uint64
cachedItem predicateItemType
expectedInvalid bool
expectedPredicateItem predicateItemType
}{
{
name: "test 1",
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}},
nodeName: "node1",
equivalenceHash: 123,
predicateKey: "GeneralPredicates",
cachedItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
},
expectedInvalid: true,
expectedPredicateItem: predicateItemType{
fit: false,
reasons: []algorithm.PredicateFailureReason{},
},
},
}
for _, test := range tests {
// this case does not need to calculate equivalence hash, just pass an empty function
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
// set cached item to equivalence cache
ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHash)
// if we want to do invalid, invalid the cached item
if test.expectedInvalid {
predicateKeys := sets.NewString()
predicateKeys.Insert(test.predicateKey)
ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys)
}
// calculate predicate with equivalence cache
fit, reasons, invalid := ecache.PredicateWithECache(test.pod, test.nodeName, test.predicateKey, test.equivalenceHash)
// returned invalid should match expectedInvalid
if invalid != test.expectedInvalid {
t.Errorf("Failed : %s, expected invalid: %v, but got: %v", test.name, test.expectedInvalid, invalid)
}
// returned predicate result should match expected predicate item
if fit != test.expectedPredicateItem.fit {
t.Errorf("Failed : %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit)
}
if !reflect.DeepEqual(reasons, test.expectedPredicateItem.reasons) {
t.Errorf("Failed : %s, expected reasons: %v, but got: %v", test.name, test.cachedItem.reasons, reasons)
}
}
}

View File

@ -293,7 +293,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
} }
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders) cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
podIgnored := &v1.Pod{} podIgnored := &v1.Pod{}
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr { if test.expectsErr {

View File

@ -69,6 +69,7 @@ func (f *FitError) Error() string {
type genericScheduler struct { type genericScheduler struct {
cache schedulercache.Cache cache schedulercache.Cache
equivalenceCache *EquivalenceCache
predicates map[string]algorithm.FitPredicate predicates map[string]algorithm.FitPredicate
priorityMetaProducer algorithm.MetadataProducer priorityMetaProducer algorithm.MetadataProducer
predicateMetaProducer algorithm.MetadataProducer predicateMetaProducer algorithm.MetadataProducer
@ -79,8 +80,6 @@ type genericScheduler struct {
lastNodeIndex uint64 lastNodeIndex uint64
cachedNodeInfoMap map[string]*schedulercache.NodeInfo cachedNodeInfoMap map[string]*schedulercache.NodeInfo
equivalenceCache *EquivalenceCache
} }
// Schedule tries to schedule the given pod to one of node in the node list. // Schedule tries to schedule the given pod to one of node in the node list.
@ -104,10 +103,8 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
return "", err return "", err
} }
// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here
trace.Step("Computing predicates") trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer) filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -158,6 +155,7 @@ func findNodesThatFit(
predicateFuncs map[string]algorithm.FitPredicate, predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender, extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.MetadataProducer, metadataProducer algorithm.MetadataProducer,
ecache *EquivalenceCache,
) ([]*v1.Node, FailedPredicateMap, error) { ) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{} failedPredicateMap := FailedPredicateMap{}
@ -176,7 +174,7 @@ func findNodesThatFit(
meta := metadataProducer(pod, nodeNameToInfo) meta := metadataProducer(pod, nodeNameToInfo)
checkNode := func(i int) { checkNode := func(i int) {
nodeName := nodes[i].Name nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs) fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache)
if err != nil { if err != nil {
predicateResultLock.Lock() predicateResultLock.Lock()
errs = append(errs, err) errs = append(errs, err)
@ -221,15 +219,45 @@ func findNodesThatFit(
} }
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs. // Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) { func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate,
var failedPredicates []algorithm.PredicateFailureReason ecache *EquivalenceCache) (bool, []algorithm.PredicateFailureReason, error) {
for _, predicate := range predicateFuncs { var (
fit, reasons, err := predicate(pod, meta, info) equivalenceHash uint64
if err != nil { failedPredicates []algorithm.PredicateFailureReason
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err) eCacheAvailable bool
return false, []algorithm.PredicateFailureReason{}, err invalid bool
fit bool
reasons []algorithm.PredicateFailureReason
err error
)
if ecache != nil {
// getHashEquivalencePod will return immediately if no equivalence pod found
equivalenceHash = ecache.getHashEquivalencePod(pod)
eCacheAvailable = (equivalenceHash != 0)
}
for predicateKey, predicate := range predicateFuncs {
// If equivalenceCache is available
if eCacheAvailable {
// PredicateWithECache will returns it's cached predicate results
fit, reasons, invalid = ecache.PredicateWithECache(pod, info.Node().GetName(), predicateKey, equivalenceHash)
} }
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, meta, info)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
if eCacheAvailable {
// update equivalence cache with newly computed fit & reasons
// TODO(resouer) should we do this in another thread? any race?
ecache.UpdateCachedPredicateItem(pod, info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash)
}
}
if !fit { if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...) failedPredicates = append(failedPredicates, reasons...)
} }
} }
@ -386,6 +414,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
func NewGenericScheduler( func NewGenericScheduler(
cache schedulercache.Cache, cache schedulercache.Cache,
eCache *EquivalenceCache,
predicates map[string]algorithm.FitPredicate, predicates map[string]algorithm.FitPredicate,
predicateMetaProducer algorithm.MetadataProducer, predicateMetaProducer algorithm.MetadataProducer,
prioritizers []algorithm.PriorityConfig, prioritizers []algorithm.PriorityConfig,
@ -393,6 +422,7 @@ func NewGenericScheduler(
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
return &genericScheduler{ return &genericScheduler{
cache: cache, cache: cache,
equivalenceCache: eCache,
predicates: predicates, predicates: predicates,
predicateMetaProducer: predicateMetaProducer, predicateMetaProducer: predicateMetaProducer,
prioritizers: prioritizers, prioritizers: prioritizers,

View File

@ -307,7 +307,7 @@ func TestGenericScheduler(t *testing.T) {
} }
scheduler := NewGenericScheduler( scheduler := NewGenericScheduler(
cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{}) []algorithm.SchedulerExtender{})
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
@ -328,7 +328,7 @@ func TestFindFitAllError(t *testing.T) {
"2": schedulercache.NewNodeInfo(), "2": schedulercache.NewNodeInfo(),
"1": schedulercache.NewNodeInfo(), "1": schedulercache.NewNodeInfo(),
} }
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer) _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
@ -362,7 +362,7 @@ func TestFindFitSomeError(t *testing.T) {
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
} }
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer) _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }

View File

@ -189,7 +189,7 @@ func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister {
return c.scheduledPodLister return c.scheduledPodLister
} }
// TODO(harryz) need to update all the handlers here and below for equivalence cache // TODO(resouer) need to update all the handlers here and below for equivalence cache
func (c *ConfigFactory) addPodToCache(obj interface{}) { func (c *ConfigFactory) addPodToCache(obj interface{}) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
if !ok { if !ok {
@ -370,7 +370,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
} }
f.Run() f.Run()
algo := core.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) // TODO(resouer) use equivalence cache instead of nil here when #36238 get merged
algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
podBackoff := util.CreateDefaultPodBackoff() podBackoff := util.CreateDefaultPodBackoff()
return &scheduler.Config{ return &scheduler.Config{
SchedulerCache: f.schedulerCache, SchedulerCache: f.schedulerCache,

View File

@ -27,6 +27,7 @@ import (
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/kubernetes/plugin/pkg/scheduler/util" "k8s.io/kubernetes/plugin/pkg/scheduler/util"
@ -92,9 +93,12 @@ type Config struct {
// It is expected that changes made via SchedulerCache will be observed // It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm. // by NodeLister and Algorithm.
SchedulerCache schedulercache.Cache SchedulerCache schedulercache.Cache
NodeLister algorithm.NodeLister // Ecache is used for optimistically invalid affected cache items after
Algorithm algorithm.ScheduleAlgorithm // successfully binding a pod
Binder Binder Ecache *core.EquivalenceCache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed // PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind // with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic. // handler so that binding and setting PodCondition it is atomic.
@ -193,6 +197,13 @@ func (sched *Scheduler) scheduleOne() {
return return
} }
// Optimistically assume that the binding will succeed, so we need to invalidate affected
// predicates in equivalence cache.
// If the binding fails, these invalidated item will not break anything.
if sched.config.Ecache != nil {
sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(pod, dest)
}
go func() { go func() {
defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))

View File

@ -480,6 +480,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *v1.Binding, chan error) { func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *v1.Binding, chan error) {
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
scache, scache,
nil,
predicateMap, predicateMap,
algorithm.EmptyMetadataProducer, algorithm.EmptyMetadataProducer,
[]algorithm.PriorityConfig{}, []algorithm.PriorityConfig{},
@ -510,6 +511,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
algo := core.NewGenericScheduler( algo := core.NewGenericScheduler(
scache, scache,
nil,
predicateMap, predicateMap,
algorithm.EmptyMetadataProducer, algorithm.EmptyMetadataProducer,
[]algorithm.PriorityConfig{}, []algorithm.PriorityConfig{},