From 819554f514ab697db982fae355ef93180ef1e82c Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Wed, 15 Feb 2017 16:55:02 +0800 Subject: [PATCH 1/3] Update equivalence cache to use predicate as key Remove Invalid field from host predicate --- .../algorithmprovider/defaults/defaults.go | 11 +- .../pkg/scheduler/core/equivalence_cache.go | 197 ++++++++++++------ .../scheduler/core/equivalence_cache_test.go | 131 ++++++++++++ 3 files changed, 275 insertions(+), 64 deletions(-) create mode 100644 plugin/pkg/scheduler/core/equivalence_cache_test.go diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 6b1f6550936..5acb2f6b66c 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -247,22 +247,21 @@ func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.Strin // GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused. func GetEquivalencePod(pod *v1.Pod) interface{} { - equivalencePod := EquivalencePod{} // For now we only consider pods: // 1. OwnerReferences is Controller - // 2. OwnerReferences kind is in valid controller kinds - // 3. with same OwnerReferences + // 2. with same OwnerReferences // to be equivalent if len(pod.OwnerReferences) != 0 { for _, ref := range pod.OwnerReferences { if *ref.Controller { - equivalencePod.ControllerRef = ref // a pod can only belongs to one controller - break + return &EquivalencePod{ + ControllerRef: ref, + } } } } - return &equivalencePod + return nil } // EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods. diff --git a/plugin/pkg/scheduler/core/equivalence_cache.go b/plugin/pkg/scheduler/core/equivalence_cache.go index 7e4ebfd660f..27e43b1f9c6 100644 --- a/plugin/pkg/scheduler/core/equivalence_cache.go +++ b/plugin/pkg/scheduler/core/equivalence_cache.go @@ -18,18 +18,19 @@ package core import ( "hash/fnv" - - "github.com/golang/groupcache/lru" - "sync" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api/v1" hashutil "k8s.io/kubernetes/pkg/util/hash" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + + "github.com/golang/glog" + "github.com/golang/groupcache/lru" ) -// TODO(harryz) figure out the right number for this, 4096 may be too big -const maxCacheEntries = 4096 +// we use predicate names as cache's key, its count is limited +const maxCacheEntries = 100 type HostPredicate struct { Fit bool @@ -41,6 +42,9 @@ type AlgorithmCache struct { predicatesCache *lru.Cache } +// PredicateMap use equivalence hash as key +type PredicateMap map[uint64]HostPredicate + func newAlgorithmCache() AlgorithmCache { return AlgorithmCache{ predicatesCache: lru.New(maxCacheEntries), @@ -61,74 +65,151 @@ func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) } } -// addPodPredicate adds pod predicate for equivalence class -func (ec *EquivalenceCache) addPodPredicate(podKey uint64, nodeName string, fit bool, failReasons []algorithm.PredicateFailureReason) { +// UpdateCachedPredicateItem updates pod predicate for equivalence class +func (ec *EquivalenceCache) UpdateCachedPredicateItem(pod *v1.Pod, nodeName, predicateKey string, fit bool, reasons []algorithm.PredicateFailureReason, equivalenceHash uint64) { + ec.Lock() + defer ec.Unlock() if _, exist := ec.algorithmCache[nodeName]; !exist { ec.algorithmCache[nodeName] = newAlgorithmCache() } - ec.algorithmCache[nodeName].predicatesCache.Add(podKey, HostPredicate{Fit: fit, FailReasons: failReasons}) + predicateItem := HostPredicate{ + Fit: fit, + FailReasons: reasons, + } + // if cached predicate map already exists, just update the predicate by key + if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok { + predicateMap := v.(PredicateMap) + // maps in golang are references, no need to add them back + predicateMap[equivalenceHash] = predicateItem + } else { + ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey, + PredicateMap{ + equivalenceHash: predicateItem, + }) + } + glog.V(5).Infof("Updated cached predicate: %v for pod: %v on node: %s, with item %v", predicateKey, pod.GetName(), nodeName, predicateItem) } -// AddPodPredicatesCache cache pod predicate for equivalence class -func (ec *EquivalenceCache) AddPodPredicatesCache(pod *v1.Pod, fitNodeList []*v1.Node, failedPredicates *FailedPredicateMap) { - equivalenceHash := ec.hashEquivalencePod(pod) - - for _, fitNode := range fitNodeList { - ec.addPodPredicate(equivalenceHash, fitNode.Name, true, nil) - } - for failNodeName, failReasons := range *failedPredicates { - ec.addPodPredicate(equivalenceHash, failNodeName, false, failReasons) - } -} - -// GetCachedPredicates gets cached predicates for equivalence class -func (ec *EquivalenceCache) GetCachedPredicates(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, []*v1.Node) { - fitNodeList := []*v1.Node{} - failedPredicates := FailedPredicateMap{} - noCacheNodeList := []*v1.Node{} - equivalenceHash := ec.hashEquivalencePod(pod) - for _, node := range nodes { - findCache := false - if algorithmCache, exist := ec.algorithmCache[node.Name]; exist { - if cachePredicate, exist := algorithmCache.predicatesCache.Get(equivalenceHash); exist { - hostPredicate := cachePredicate.(HostPredicate) +// PredicateWithECache returns: +// 1. if fit +// 2. reasons if not fit +// 3. if this cache is invalid +// based on cached predicate results +func (ec *EquivalenceCache) PredicateWithECache(pod *v1.Pod, nodeName, predicateKey string, equivalenceHash uint64) (bool, []algorithm.PredicateFailureReason, bool) { + ec.RLock() + defer ec.RUnlock() + glog.V(5).Infof("Begin to calculate predicate: %v for pod: %s on node: %s based on equivalence cache", predicateKey, pod.GetName(), nodeName) + if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { + if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist { + predicateMap := cachePredicate.(PredicateMap) + // TODO(resouer) Is it possible a race that cache failed to update immediately? + if hostPredicate, ok := predicateMap[equivalenceHash]; ok { if hostPredicate.Fit { - fitNodeList = append(fitNodeList, node) + return true, []algorithm.PredicateFailureReason{}, false } else { - failedPredicates[node.Name] = hostPredicate.FailReasons + return false, hostPredicate.FailReasons, false } - findCache = true + } else { + // is invalid + return false, []algorithm.PredicateFailureReason{}, true } } - if !findCache { - noCacheNodeList = append(noCacheNodeList, node) + } + return false, []algorithm.PredicateFailureReason{}, true +} + +// InvalidateCachedPredicateItem marks all items of given predicateKeys, of all pods, on the given node as invalid +func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) { + if len(predicateKeys) == 0 { + return + } + ec.Lock() + defer ec.Unlock() + if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { + for predicateKey := range predicateKeys { + algorithmCache.predicatesCache.Remove(predicateKey) } } - return fitNodeList, failedPredicates, noCacheNodeList + glog.V(5).Infof("Done invalidating cached predicates: %v on node: %s", predicateKeys, nodeName) } -// SendInvalidAlgorithmCacheReq marks AlgorithmCache item as invalid -func (ec *EquivalenceCache) SendInvalidAlgorithmCacheReq(nodeName string) { - ec.Lock() - defer ec.Unlock() - // clear the cache of this node - delete(ec.algorithmCache, nodeName) -} - -// SendClearAllCacheReq marks all cached item as invalid -func (ec *EquivalenceCache) SendClearAllCacheReq() { - ec.Lock() - defer ec.Unlock() - // clear cache of all nodes - for nodeName := range ec.algorithmCache { - delete(ec.algorithmCache, nodeName) +// InvalidateCachedPredicateItemOfAllNodes marks all items of given predicateKeys, of all pods, on all node as invalid +func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) { + if len(predicateKeys) == 0 { + return } + ec.Lock() + defer ec.Unlock() + // algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates + for _, algorithmCache := range ec.algorithmCache { + for predicateKey := range predicateKeys { + // just use keys is enough + algorithmCache.predicatesCache.Remove(predicateKey) + } + } + glog.V(5).Infof("Done invalidating cached predicates: %v on all node", predicateKeys) } -// hashEquivalencePod returns the hash of equivalence pod. -func (ec *EquivalenceCache) hashEquivalencePod(pod *v1.Pod) uint64 { - equivalencePod := ec.getEquivalencePod(pod) - hash := fnv.New32a() - hashutil.DeepHashObject(hash, equivalencePod) - return uint64(hash.Sum32()) +// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid +func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) { + ec.Lock() + defer ec.Unlock() + delete(ec.algorithmCache, nodeName) + glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName) +} + +// InvalidateCachedPredicateItemForPod marks item of given predicateKeys, of given pod, on the given node as invalid +func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPod(nodeName string, predicateKeys sets.String, pod *v1.Pod) { + if len(predicateKeys) == 0 { + return + } + equivalenceHash := ec.getHashEquivalencePod(pod) + if equivalenceHash == 0 { + // no equivalence pod found, just return + return + } + ec.Lock() + defer ec.Unlock() + if algorithmCache, exist := ec.algorithmCache[nodeName]; exist { + for predicateKey := range predicateKeys { + if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist { + // got the cached item of by predicateKey & pod + predicateMap := cachePredicate.(PredicateMap) + delete(predicateMap, equivalenceHash) + } + } + } + glog.V(5).Infof("Done invalidating cached predicates %v on node %s, for pod %v", predicateKeys, nodeName, pod.GetName()) +} + +// InvalidateCachedPredicateItemForPodAdd is a wrapper of InvalidateCachedPredicateItem for pod add case +func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) { + // MatchInterPodAffinity: we assume scheduler can make sure newly binded pod + // will not break the existing inter pod affinity. So we does not need to invalidate + // MatchInterPodAffinity when pod added. + // + // But when a pod is deleted, existing inter pod affinity may become invalid. + // (e.g. this pod was preferred by some else, or vice versa) + // + // NOTE: assumptions above will not stand when we implemented features like + // RequiredDuringSchedulingRequiredDuringExecution. + + // NoDiskConflict: the newly scheduled pod fits to existing pods on this node, + // it will also fits to equivalence class of existing pods + + // GeneralPredicates: will always be affected by adding a new pod + invalidPredicates := sets.NewString("GeneralPredicates") + ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates) +} + +// getHashEquivalencePod returns the hash of equivalence pod. +// if no equivalence pod found, return 0 +func (ec *EquivalenceCache) getHashEquivalencePod(pod *v1.Pod) uint64 { + equivalencePod := ec.getEquivalencePod(pod) + if equivalencePod != nil { + hash := fnv.New32a() + hashutil.DeepHashObject(hash, equivalencePod) + return uint64(hash.Sum32()) + } + return 0 } diff --git a/plugin/pkg/scheduler/core/equivalence_cache_test.go b/plugin/pkg/scheduler/core/equivalence_cache_test.go new file mode 100644 index 00000000000..f19b7ce7080 --- /dev/null +++ b/plugin/pkg/scheduler/core/equivalence_cache_test.go @@ -0,0 +1,131 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "reflect" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" + "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates" +) + +func TestUpdateCachedPredicateItem(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + predicateKey string + nodeName string + fit bool + reasons []algorithm.PredicateFailureReason + equivalenceHash uint64 + expectCacheItem HostPredicate + }{ + { + name: "test 1", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + predicateKey: "GeneralPredicates", + nodeName: "node1", + fit: true, + equivalenceHash: 123, + expectCacheItem: HostPredicate{ + Fit: true, + }, + }, + } + for _, test := range tests { + // this case does not need to calculate equivalence hash, just pass an empty function + fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil } + ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc) + ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.fit, test.reasons, test.equivalenceHash) + + value, ok := ecache.algorithmCache[test.nodeName].predicatesCache.Get(test.predicateKey) + if !ok { + t.Errorf("Failed : %s, can't find expected cache item: %v", test.name, test.expectCacheItem) + } else { + cachedMapItem := value.(PredicateMap) + if !reflect.DeepEqual(cachedMapItem[test.equivalenceHash], test.expectCacheItem) { + t.Errorf("Failed : %s, expected cached item: %v, but got: %v", test.name, test.expectCacheItem, cachedMapItem[test.equivalenceHash]) + } + } + } +} + +type predicateItemType struct { + fit bool + reasons []algorithm.PredicateFailureReason +} + +func TestInvalidateCachedPredicateItem(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + nodeName string + predicateKey string + equivalenceHash uint64 + cachedItem predicateItemType + expectedInvalid bool + expectedPredicateItem predicateItemType + }{ + { + name: "test 1", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod"}}, + nodeName: "node1", + equivalenceHash: 123, + predicateKey: "GeneralPredicates", + cachedItem: predicateItemType{ + fit: false, + reasons: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}, + }, + expectedInvalid: true, + expectedPredicateItem: predicateItemType{ + fit: false, + reasons: []algorithm.PredicateFailureReason{}, + }, + }, + } + + for _, test := range tests { + // this case does not need to calculate equivalence hash, just pass an empty function + fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil } + ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc) + // set cached item to equivalence cache + ecache.UpdateCachedPredicateItem(test.pod, test.nodeName, test.predicateKey, test.cachedItem.fit, test.cachedItem.reasons, test.equivalenceHash) + // if we want to do invalid, invalid the cached item + if test.expectedInvalid { + predicateKeys := sets.NewString() + predicateKeys.Insert(test.predicateKey) + ecache.InvalidateCachedPredicateItem(test.nodeName, predicateKeys) + } + // calculate predicate with equivalence cache + fit, reasons, invalid := ecache.PredicateWithECache(test.pod, test.nodeName, test.predicateKey, test.equivalenceHash) + // returned invalid should match expectedInvalid + if invalid != test.expectedInvalid { + t.Errorf("Failed : %s, expected invalid: %v, but got: %v", test.name, test.expectedInvalid, invalid) + } + // returned predicate result should match expected predicate item + if fit != test.expectedPredicateItem.fit { + t.Errorf("Failed : %s, expected fit: %v, but got: %v", test.name, test.cachedItem.fit, fit) + } + if !reflect.DeepEqual(reasons, test.expectedPredicateItem.reasons) { + t.Errorf("Failed : %s, expected reasons: %v, but got: %v", test.name, test.cachedItem.reasons, reasons) + } + } +} From 2c4514c3253042abbe63f6470f18acba174c00a2 Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Wed, 15 Feb 2017 16:59:30 +0800 Subject: [PATCH 2/3] Enable equivalence cache in generic scheduler --- plugin/pkg/scheduler/core/extender_test.go | 2 +- .../pkg/scheduler/core/generic_scheduler.go | 56 ++++++++++++++----- .../scheduler/core/generic_scheduler_test.go | 6 +- plugin/pkg/scheduler/factory/factory.go | 5 +- plugin/pkg/scheduler/scheduler.go | 17 +++++- plugin/pkg/scheduler/scheduler_test.go | 2 + 6 files changed, 66 insertions(+), 22 deletions(-) diff --git a/plugin/pkg/scheduler/core/extender_test.go b/plugin/pkg/scheduler/core/extender_test.go index bcaa865d073..d9d5b83fe83 100644 --- a/plugin/pkg/scheduler/core/extender_test.go +++ b/plugin/pkg/scheduler/core/extender_test.go @@ -293,7 +293,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) } scheduler := NewGenericScheduler( - cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders) + cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders) podIgnored := &v1.Pod{} machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index 3d24aa8c515..928b7ea7c75 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -69,6 +69,7 @@ func (f *FitError) Error() string { type genericScheduler struct { cache schedulercache.Cache + equivalenceCache *EquivalenceCache predicates map[string]algorithm.FitPredicate priorityMetaProducer algorithm.MetadataProducer predicateMetaProducer algorithm.MetadataProducer @@ -79,8 +80,6 @@ type genericScheduler struct { lastNodeIndex uint64 cachedNodeInfoMap map[string]*schedulercache.NodeInfo - - equivalenceCache *EquivalenceCache } // Schedule tries to schedule the given pod to one of node in the node list. @@ -104,10 +103,8 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister return "", err } - // TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here - trace.Step("Computing predicates") - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache) if err != nil { return "", err } @@ -158,6 +155,7 @@ func findNodesThatFit( predicateFuncs map[string]algorithm.FitPredicate, extenders []algorithm.SchedulerExtender, metadataProducer algorithm.MetadataProducer, + ecache *EquivalenceCache, ) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} @@ -176,7 +174,7 @@ func findNodesThatFit( meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := nodes[i].Name - fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs) + fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache) if err != nil { predicateResultLock.Lock() errs = append(errs, err) @@ -221,15 +219,45 @@ func findNodesThatFit( } // Checks whether node with a given name and NodeInfo satisfies all predicateFuncs. -func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) { - var failedPredicates []algorithm.PredicateFailureReason - for _, predicate := range predicateFuncs { - fit, reasons, err := predicate(pod, meta, info) - if err != nil { - err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err) - return false, []algorithm.PredicateFailureReason{}, err +func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, + ecache *EquivalenceCache) (bool, []algorithm.PredicateFailureReason, error) { + var ( + equivalenceHash uint64 + failedPredicates []algorithm.PredicateFailureReason + eCacheAvailable bool + invalid bool + fit bool + reasons []algorithm.PredicateFailureReason + err error + ) + if ecache != nil { + // getHashEquivalencePod will return immediately if no equivalence pod found + equivalenceHash = ecache.getHashEquivalencePod(pod) + eCacheAvailable = (equivalenceHash != 0) + } + for predicateKey, predicate := range predicateFuncs { + // If equivalenceCache is available + if eCacheAvailable { + // PredicateWithECache will returns it's cached predicate results + fit, reasons, invalid = ecache.PredicateWithECache(pod, info.Node().GetName(), predicateKey, equivalenceHash) } + + if !eCacheAvailable || invalid { + // we need to execute predicate functions since equivalence cache does not work + fit, reasons, err = predicate(pod, meta, info) + if err != nil { + return false, []algorithm.PredicateFailureReason{}, err + } + + if eCacheAvailable { + // update equivalence cache with newly computed fit & reasons + // TODO(resouer) should we do this in another thread? any race? + ecache.UpdateCachedPredicateItem(pod, info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash) + } + } + if !fit { + // eCache is available and valid, and predicates result is unfit, record the fail reasons failedPredicates = append(failedPredicates, reasons...) } } @@ -386,6 +414,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf func NewGenericScheduler( cache schedulercache.Cache, + eCache *EquivalenceCache, predicates map[string]algorithm.FitPredicate, predicateMetaProducer algorithm.MetadataProducer, prioritizers []algorithm.PriorityConfig, @@ -393,6 +422,7 @@ func NewGenericScheduler( extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { return &genericScheduler{ cache: cache, + equivalenceCache: eCache, predicates: predicates, predicateMetaProducer: predicateMetaProducer, prioritizers: prioritizers, diff --git a/plugin/pkg/scheduler/core/generic_scheduler_test.go b/plugin/pkg/scheduler/core/generic_scheduler_test.go index c78c2c65f5c..628587b7d57 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/core/generic_scheduler_test.go @@ -307,7 +307,7 @@ func TestGenericScheduler(t *testing.T) { } scheduler := NewGenericScheduler( - cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, + cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}) machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) @@ -328,7 +328,7 @@ func TestFindFitAllError(t *testing.T) { "2": schedulercache.NewNodeInfo(), "1": schedulercache.NewNodeInfo(), } - _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer) + _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil) if err != nil { t.Errorf("unexpected error: %v", err) @@ -362,7 +362,7 @@ func TestFindFitSomeError(t *testing.T) { nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) } - _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer) + _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 5391944eeaa..91bfde1c2a5 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -189,7 +189,7 @@ func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister { return c.scheduledPodLister } -// TODO(harryz) need to update all the handlers here and below for equivalence cache +// TODO(resouer) need to update all the handlers here and below for equivalence cache func (c *ConfigFactory) addPodToCache(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { @@ -370,7 +370,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, } f.Run() - algo := core.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) + // TODO(resouer) use equivalence cache instead of nil here when #36238 get merged + algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) podBackoff := util.CreateDefaultPodBackoff() return &scheduler.Config{ SchedulerCache: f.schedulerCache, diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index f151d1e0bdb..84bc88f1587 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/util" @@ -92,9 +93,12 @@ type Config struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. SchedulerCache schedulercache.Cache - NodeLister algorithm.NodeLister - Algorithm algorithm.ScheduleAlgorithm - Binder Binder + // Ecache is used for optimistically invalid affected cache items after + // successfully binding a pod + Ecache *core.EquivalenceCache + NodeLister algorithm.NodeLister + Algorithm algorithm.ScheduleAlgorithm + Binder Binder // PodConditionUpdater is used only in case of scheduling errors. If we succeed // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. @@ -193,6 +197,13 @@ func (sched *Scheduler) scheduleOne() { return } + // Optimistically assume that the binding will succeed, so we need to invalidate affected + // predicates in equivalence cache. + // If the binding fails, these invalidated item will not break anything. + if sched.config.Ecache != nil { + sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(pod, dest) + } + go func() { defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 9f9de1dc0b6..f438bd11eef 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -480,6 +480,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *v1.Binding, chan error) { algo := core.NewGenericScheduler( scache, + nil, predicateMap, algorithm.EmptyMetadataProducer, []algorithm.PriorityConfig{}, @@ -510,6 +511,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { algo := core.NewGenericScheduler( scache, + nil, predicateMap, algorithm.EmptyMetadataProducer, []algorithm.PriorityConfig{}, From 63197e53a111541ef858d94f0f182102f948d81f Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Thu, 16 Feb 2017 16:13:56 +0800 Subject: [PATCH 3/3] Update generated BUILD files --- plugin/pkg/scheduler/BUILD | 1 + plugin/pkg/scheduler/core/BUILD | 2 ++ 2 files changed, 3 insertions(+) diff --git a/plugin/pkg/scheduler/BUILD b/plugin/pkg/scheduler/BUILD index a235d16123f..f318e480ec5 100644 --- a/plugin/pkg/scheduler/BUILD +++ b/plugin/pkg/scheduler/BUILD @@ -43,6 +43,7 @@ go_library( "//pkg/client/listers/core/v1:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//plugin/pkg/scheduler/api:go_default_library", + "//plugin/pkg/scheduler/core:go_default_library", "//plugin/pkg/scheduler/metrics:go_default_library", "//plugin/pkg/scheduler/schedulercache:go_default_library", "//plugin/pkg/scheduler/util:go_default_library", diff --git a/plugin/pkg/scheduler/core/BUILD b/plugin/pkg/scheduler/core/BUILD index 3945f1e43ed..c38ffa8ee25 100644 --- a/plugin/pkg/scheduler/core/BUILD +++ b/plugin/pkg/scheduler/core/BUILD @@ -11,6 +11,7 @@ load( go_test( name = "go_default_test", srcs = [ + "equivalence_cache_test.go", "extender_test.go", "generic_scheduler_test.go", ], @@ -53,6 +54,7 @@ go_library( "//vendor:github.com/golang/groupcache/lru", "//vendor:k8s.io/apimachinery/pkg/util/errors", "//vendor:k8s.io/apimachinery/pkg/util/net", + "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apiserver/pkg/util/trace", "//vendor:k8s.io/client-go/rest", "//vendor:k8s.io/client-go/util/workqueue",