diff --git a/plugin/pkg/scheduler/core/extender_test.go b/plugin/pkg/scheduler/core/extender_test.go index bcaa865d073..d9d5b83fe83 100644 --- a/plugin/pkg/scheduler/core/extender_test.go +++ b/plugin/pkg/scheduler/core/extender_test.go @@ -293,7 +293,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) } scheduler := NewGenericScheduler( - cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders) + cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders) podIgnored := &v1.Pod{} machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) if test.expectsErr { diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index 3d24aa8c515..928b7ea7c75 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -69,6 +69,7 @@ func (f *FitError) Error() string { type genericScheduler struct { cache schedulercache.Cache + equivalenceCache *EquivalenceCache predicates map[string]algorithm.FitPredicate priorityMetaProducer algorithm.MetadataProducer predicateMetaProducer algorithm.MetadataProducer @@ -79,8 +80,6 @@ type genericScheduler struct { lastNodeIndex uint64 cachedNodeInfoMap map[string]*schedulercache.NodeInfo - - equivalenceCache *EquivalenceCache } // Schedule tries to schedule the given pod to one of node in the node list. @@ -104,10 +103,8 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister return "", err } - // TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here - trace.Step("Computing predicates") - filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer) + filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache) if err != nil { return "", err } @@ -158,6 +155,7 @@ func findNodesThatFit( predicateFuncs map[string]algorithm.FitPredicate, extenders []algorithm.SchedulerExtender, metadataProducer algorithm.MetadataProducer, + ecache *EquivalenceCache, ) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} @@ -176,7 +174,7 @@ func findNodesThatFit( meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := nodes[i].Name - fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs) + fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache) if err != nil { predicateResultLock.Lock() errs = append(errs, err) @@ -221,15 +219,45 @@ func findNodesThatFit( } // Checks whether node with a given name and NodeInfo satisfies all predicateFuncs. -func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) { - var failedPredicates []algorithm.PredicateFailureReason - for _, predicate := range predicateFuncs { - fit, reasons, err := predicate(pod, meta, info) - if err != nil { - err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err) - return false, []algorithm.PredicateFailureReason{}, err +func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, + ecache *EquivalenceCache) (bool, []algorithm.PredicateFailureReason, error) { + var ( + equivalenceHash uint64 + failedPredicates []algorithm.PredicateFailureReason + eCacheAvailable bool + invalid bool + fit bool + reasons []algorithm.PredicateFailureReason + err error + ) + if ecache != nil { + // getHashEquivalencePod will return immediately if no equivalence pod found + equivalenceHash = ecache.getHashEquivalencePod(pod) + eCacheAvailable = (equivalenceHash != 0) + } + for predicateKey, predicate := range predicateFuncs { + // If equivalenceCache is available + if eCacheAvailable { + // PredicateWithECache will returns it's cached predicate results + fit, reasons, invalid = ecache.PredicateWithECache(pod, info.Node().GetName(), predicateKey, equivalenceHash) } + + if !eCacheAvailable || invalid { + // we need to execute predicate functions since equivalence cache does not work + fit, reasons, err = predicate(pod, meta, info) + if err != nil { + return false, []algorithm.PredicateFailureReason{}, err + } + + if eCacheAvailable { + // update equivalence cache with newly computed fit & reasons + // TODO(resouer) should we do this in another thread? any race? + ecache.UpdateCachedPredicateItem(pod, info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash) + } + } + if !fit { + // eCache is available and valid, and predicates result is unfit, record the fail reasons failedPredicates = append(failedPredicates, reasons...) } } @@ -386,6 +414,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf func NewGenericScheduler( cache schedulercache.Cache, + eCache *EquivalenceCache, predicates map[string]algorithm.FitPredicate, predicateMetaProducer algorithm.MetadataProducer, prioritizers []algorithm.PriorityConfig, @@ -393,6 +422,7 @@ func NewGenericScheduler( extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm { return &genericScheduler{ cache: cache, + equivalenceCache: eCache, predicates: predicates, predicateMetaProducer: predicateMetaProducer, prioritizers: prioritizers, diff --git a/plugin/pkg/scheduler/core/generic_scheduler_test.go b/plugin/pkg/scheduler/core/generic_scheduler_test.go index c78c2c65f5c..628587b7d57 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/core/generic_scheduler_test.go @@ -307,7 +307,7 @@ func TestGenericScheduler(t *testing.T) { } scheduler := NewGenericScheduler( - cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, + cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, []algorithm.SchedulerExtender{}) machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes))) @@ -328,7 +328,7 @@ func TestFindFitAllError(t *testing.T) { "2": schedulercache.NewNodeInfo(), "1": schedulercache.NewNodeInfo(), } - _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer) + _, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil) if err != nil { t.Errorf("unexpected error: %v", err) @@ -362,7 +362,7 @@ func TestFindFitSomeError(t *testing.T) { nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) } - _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer) + _, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 5391944eeaa..91bfde1c2a5 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -189,7 +189,7 @@ func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister { return c.scheduledPodLister } -// TODO(harryz) need to update all the handlers here and below for equivalence cache +// TODO(resouer) need to update all the handlers here and below for equivalence cache func (c *ConfigFactory) addPodToCache(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { @@ -370,7 +370,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, } f.Run() - algo := core.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) + // TODO(resouer) use equivalence cache instead of nil here when #36238 get merged + algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders) podBackoff := util.CreateDefaultPodBackoff() return &scheduler.Config{ SchedulerCache: f.schedulerCache, diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index f151d1e0bdb..84bc88f1587 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/core" "k8s.io/kubernetes/plugin/pkg/scheduler/metrics" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" "k8s.io/kubernetes/plugin/pkg/scheduler/util" @@ -92,9 +93,12 @@ type Config struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. SchedulerCache schedulercache.Cache - NodeLister algorithm.NodeLister - Algorithm algorithm.ScheduleAlgorithm - Binder Binder + // Ecache is used for optimistically invalid affected cache items after + // successfully binding a pod + Ecache *core.EquivalenceCache + NodeLister algorithm.NodeLister + Algorithm algorithm.ScheduleAlgorithm + Binder Binder // PodConditionUpdater is used only in case of scheduling errors. If we succeed // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. @@ -193,6 +197,13 @@ func (sched *Scheduler) scheduleOne() { return } + // Optimistically assume that the binding will succeed, so we need to invalidate affected + // predicates in equivalence cache. + // If the binding fails, these invalidated item will not break anything. + if sched.config.Ecache != nil { + sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(pod, dest) + } + go func() { defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 9f9de1dc0b6..f438bd11eef 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -480,6 +480,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *v1.Binding, chan error) { algo := core.NewGenericScheduler( scache, + nil, predicateMap, algorithm.EmptyMetadataProducer, []algorithm.PriorityConfig{}, @@ -510,6 +511,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache. func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { algo := core.NewGenericScheduler( scache, + nil, predicateMap, algorithm.EmptyMetadataProducer, []algorithm.PriorityConfig{},