mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Enable equivalence cache in generic scheduler
This commit is contained in:
parent
819554f514
commit
2c4514c325
@ -293,7 +293,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
|
||||
}
|
||||
scheduler := NewGenericScheduler(
|
||||
cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
|
||||
cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer, extenders)
|
||||
podIgnored := &v1.Pod{}
|
||||
machine, err := scheduler.Schedule(podIgnored, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
|
||||
if test.expectsErr {
|
||||
|
@ -69,6 +69,7 @@ func (f *FitError) Error() string {
|
||||
|
||||
type genericScheduler struct {
|
||||
cache schedulercache.Cache
|
||||
equivalenceCache *EquivalenceCache
|
||||
predicates map[string]algorithm.FitPredicate
|
||||
priorityMetaProducer algorithm.MetadataProducer
|
||||
predicateMetaProducer algorithm.MetadataProducer
|
||||
@ -79,8 +80,6 @@ type genericScheduler struct {
|
||||
lastNodeIndex uint64
|
||||
|
||||
cachedNodeInfoMap map[string]*schedulercache.NodeInfo
|
||||
|
||||
equivalenceCache *EquivalenceCache
|
||||
}
|
||||
|
||||
// Schedule tries to schedule the given pod to one of node in the node list.
|
||||
@ -104,10 +103,8 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
|
||||
return "", err
|
||||
}
|
||||
|
||||
// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here
|
||||
|
||||
trace.Step("Computing predicates")
|
||||
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer)
|
||||
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -158,6 +155,7 @@ func findNodesThatFit(
|
||||
predicateFuncs map[string]algorithm.FitPredicate,
|
||||
extenders []algorithm.SchedulerExtender,
|
||||
metadataProducer algorithm.MetadataProducer,
|
||||
ecache *EquivalenceCache,
|
||||
) ([]*v1.Node, FailedPredicateMap, error) {
|
||||
var filtered []*v1.Node
|
||||
failedPredicateMap := FailedPredicateMap{}
|
||||
@ -176,7 +174,7 @@ func findNodesThatFit(
|
||||
meta := metadataProducer(pod, nodeNameToInfo)
|
||||
checkNode := func(i int) {
|
||||
nodeName := nodes[i].Name
|
||||
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs)
|
||||
fits, failedPredicates, err := podFitsOnNode(pod, meta, nodeNameToInfo[nodeName], predicateFuncs, ecache)
|
||||
if err != nil {
|
||||
predicateResultLock.Lock()
|
||||
errs = append(errs, err)
|
||||
@ -221,15 +219,45 @@ func findNodesThatFit(
|
||||
}
|
||||
|
||||
// Checks whether node with a given name and NodeInfo satisfies all predicateFuncs.
|
||||
func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var failedPredicates []algorithm.PredicateFailureReason
|
||||
for _, predicate := range predicateFuncs {
|
||||
fit, reasons, err := predicate(pod, meta, info)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("SchedulerPredicates failed due to %v, which is unexpected.", err)
|
||||
return false, []algorithm.PredicateFailureReason{}, err
|
||||
func podFitsOnNode(pod *v1.Pod, meta interface{}, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate,
|
||||
ecache *EquivalenceCache) (bool, []algorithm.PredicateFailureReason, error) {
|
||||
var (
|
||||
equivalenceHash uint64
|
||||
failedPredicates []algorithm.PredicateFailureReason
|
||||
eCacheAvailable bool
|
||||
invalid bool
|
||||
fit bool
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
err error
|
||||
)
|
||||
if ecache != nil {
|
||||
// getHashEquivalencePod will return immediately if no equivalence pod found
|
||||
equivalenceHash = ecache.getHashEquivalencePod(pod)
|
||||
eCacheAvailable = (equivalenceHash != 0)
|
||||
}
|
||||
for predicateKey, predicate := range predicateFuncs {
|
||||
// If equivalenceCache is available
|
||||
if eCacheAvailable {
|
||||
// PredicateWithECache will returns it's cached predicate results
|
||||
fit, reasons, invalid = ecache.PredicateWithECache(pod, info.Node().GetName(), predicateKey, equivalenceHash)
|
||||
}
|
||||
|
||||
if !eCacheAvailable || invalid {
|
||||
// we need to execute predicate functions since equivalence cache does not work
|
||||
fit, reasons, err = predicate(pod, meta, info)
|
||||
if err != nil {
|
||||
return false, []algorithm.PredicateFailureReason{}, err
|
||||
}
|
||||
|
||||
if eCacheAvailable {
|
||||
// update equivalence cache with newly computed fit & reasons
|
||||
// TODO(resouer) should we do this in another thread? any race?
|
||||
ecache.UpdateCachedPredicateItem(pod, info.Node().GetName(), predicateKey, fit, reasons, equivalenceHash)
|
||||
}
|
||||
}
|
||||
|
||||
if !fit {
|
||||
// eCache is available and valid, and predicates result is unfit, record the fail reasons
|
||||
failedPredicates = append(failedPredicates, reasons...)
|
||||
}
|
||||
}
|
||||
@ -386,6 +414,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf
|
||||
|
||||
func NewGenericScheduler(
|
||||
cache schedulercache.Cache,
|
||||
eCache *EquivalenceCache,
|
||||
predicates map[string]algorithm.FitPredicate,
|
||||
predicateMetaProducer algorithm.MetadataProducer,
|
||||
prioritizers []algorithm.PriorityConfig,
|
||||
@ -393,6 +422,7 @@ func NewGenericScheduler(
|
||||
extenders []algorithm.SchedulerExtender) algorithm.ScheduleAlgorithm {
|
||||
return &genericScheduler{
|
||||
cache: cache,
|
||||
equivalenceCache: eCache,
|
||||
predicates: predicates,
|
||||
predicateMetaProducer: predicateMetaProducer,
|
||||
prioritizers: prioritizers,
|
||||
|
@ -307,7 +307,7 @@ func TestGenericScheduler(t *testing.T) {
|
||||
}
|
||||
|
||||
scheduler := NewGenericScheduler(
|
||||
cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer,
|
||||
cache, nil, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, algorithm.EmptyMetadataProducer,
|
||||
[]algorithm.SchedulerExtender{})
|
||||
machine, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)))
|
||||
|
||||
@ -328,7 +328,7 @@ func TestFindFitAllError(t *testing.T) {
|
||||
"2": schedulercache.NewNodeInfo(),
|
||||
"1": schedulercache.NewNodeInfo(),
|
||||
}
|
||||
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer)
|
||||
_, predicateMap, err := findNodesThatFit(&v1.Pod{}, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@ -362,7 +362,7 @@ func TestFindFitSomeError(t *testing.T) {
|
||||
nodeNameToInfo[name].SetNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}})
|
||||
}
|
||||
|
||||
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer)
|
||||
_, predicateMap, err := findNodesThatFit(pod, nodeNameToInfo, makeNodeList(nodes), predicates, nil, algorithm.EmptyMetadataProducer, nil)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -189,7 +189,7 @@ func (c *ConfigFactory) GetScheduledPodLister() corelisters.PodLister {
|
||||
return c.scheduledPodLister
|
||||
}
|
||||
|
||||
// TODO(harryz) need to update all the handlers here and below for equivalence cache
|
||||
// TODO(resouer) need to update all the handlers here and below for equivalence cache
|
||||
func (c *ConfigFactory) addPodToCache(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
@ -370,7 +370,8 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
||||
}
|
||||
|
||||
f.Run()
|
||||
algo := core.NewGenericScheduler(f.schedulerCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
|
||||
// TODO(resouer) use equivalence cache instead of nil here when #36238 get merged
|
||||
algo := core.NewGenericScheduler(f.schedulerCache, nil, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)
|
||||
podBackoff := util.CreateDefaultPodBackoff()
|
||||
return &scheduler.Config{
|
||||
SchedulerCache: f.schedulerCache,
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
|
||||
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/core"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/util"
|
||||
@ -92,9 +93,12 @@ type Config struct {
|
||||
// It is expected that changes made via SchedulerCache will be observed
|
||||
// by NodeLister and Algorithm.
|
||||
SchedulerCache schedulercache.Cache
|
||||
NodeLister algorithm.NodeLister
|
||||
Algorithm algorithm.ScheduleAlgorithm
|
||||
Binder Binder
|
||||
// Ecache is used for optimistically invalid affected cache items after
|
||||
// successfully binding a pod
|
||||
Ecache *core.EquivalenceCache
|
||||
NodeLister algorithm.NodeLister
|
||||
Algorithm algorithm.ScheduleAlgorithm
|
||||
Binder Binder
|
||||
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
|
||||
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
|
||||
// handler so that binding and setting PodCondition it is atomic.
|
||||
@ -193,6 +197,13 @@ func (sched *Scheduler) scheduleOne() {
|
||||
return
|
||||
}
|
||||
|
||||
// Optimistically assume that the binding will succeed, so we need to invalidate affected
|
||||
// predicates in equivalence cache.
|
||||
// If the binding fails, these invalidated item will not break anything.
|
||||
if sched.config.Ecache != nil {
|
||||
sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(pod, dest)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
|
||||
|
||||
|
@ -480,6 +480,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
||||
func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *v1.Binding, chan error) {
|
||||
algo := core.NewGenericScheduler(
|
||||
scache,
|
||||
nil,
|
||||
predicateMap,
|
||||
algorithm.EmptyMetadataProducer,
|
||||
[]algorithm.PriorityConfig{},
|
||||
@ -510,6 +511,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
|
||||
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
|
||||
algo := core.NewGenericScheduler(
|
||||
scache,
|
||||
nil,
|
||||
predicateMap,
|
||||
algorithm.EmptyMetadataProducer,
|
||||
[]algorithm.PriorityConfig{},
|
||||
|
Loading…
Reference in New Issue
Block a user