From f58abdf96690cb70cc526ca87d35733122ff2210 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Tue, 16 Jul 2019 10:51:39 -0400 Subject: [PATCH] Use scheduler cache in affinity priority functions Make the cache implement NodeLister and expose it to the priority functions. This way, the priority functions make use of a single cache, the scheduler's, instead of mixing it with the lister's caches. Signed-off-by: Aldo Culquicondor --- pkg/scheduler/BUILD | 1 - pkg/scheduler/algorithm/types.go | 2 +- pkg/scheduler/core/generic_scheduler.go | 7 +--- pkg/scheduler/factory/factory.go | 19 ++++------ pkg/scheduler/internal/cache/cache.go | 18 ++++++++- .../internal/cache/fake/fake_cache.go | 5 +++ pkg/scheduler/internal/cache/interface.go | 12 ++---- pkg/scheduler/scheduler_test.go | 37 +++++++------------ pkg/scheduler/testing/fake_lister.go | 6 +-- 9 files changed, 53 insertions(+), 54 deletions(-) diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index af8e283f336..4bcbf17c377 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -79,7 +79,6 @@ go_test( "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", - "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/events:go_default_library", ], diff --git a/pkg/scheduler/algorithm/types.go b/pkg/scheduler/algorithm/types.go index f7a818a2b9f..38aea9d7ab6 100644 --- a/pkg/scheduler/algorithm/types.go +++ b/pkg/scheduler/algorithm/types.go @@ -34,7 +34,7 @@ var NodeFieldSelectorKeys = map[string]func(*v1.Node) string{ type NodeLister interface { // We explicitly return []*v1.Node, instead of v1.NodeList, to avoid // performing expensive copies that are unneeded. - List() ([]*v1.Node, error) + ListNodes() []*v1.Node } // PodFilter is a function to filter a pod. If pod passed return true else return false. diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 4bd26daf157..43acf9ff951 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -195,7 +195,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister return result, prefilterStatus.AsError() } - nodes, err := nodeLister.List() + nodes := nodeLister.ListNodes() if err != nil { return result, err } @@ -324,10 +324,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } - allNodes, err := nodeLister.List() - if err != nil { - return nil, nil, nil, err - } + allNodes := nodeLister.ListNodes() if len(allNodes) == 0 { return nil, nil, nil, ErrNoNodesAvailable } diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index 67df1bf7441..52303a7a92d 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -149,10 +149,12 @@ type Configurator interface { GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error) GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error) - // Needs to be exposed for things like integration tests where we want to make fake nodes. - GetNodeLister() corelisters.NodeLister // Exposed for testing GetClient() clientset.Interface + + // TODO(#80216): Remove these methods from the interface. + // Needs to be exposed for things like integration tests where we want to make fake nodes. + GetNodeLister() corelisters.NodeLister // Exposed for testing GetScheduledPodLister() corelisters.PodLister @@ -163,6 +165,7 @@ type Configurator interface { } // configFactory is the default implementation of the scheduler.Configurator interface. +// TODO(#80216): Remove pod and node listers. type configFactory struct { client clientset.Interface // a means to list all known scheduled pods. @@ -486,7 +489,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, return &Config{ SchedulerCache: c.schedulerCache, // The scheduler only needs to consider schedulable nodes. - NodeLister: &nodeLister{c.nodeLister}, + NodeLister: c.schedulerCache, Algorithm: algo, GetBinder: getBinderFunc(c.client, extenders), PodConditionUpdater: &podConditionUpdater{c.client}, @@ -521,14 +524,6 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx } } -type nodeLister struct { - corelisters.NodeLister -} - -func (n *nodeLister) List() ([]*v1.Node, error) { - return n.NodeLister.List(labels.Everything()) -} - func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) { pluginArgs, err := c.getPluginArgs() if err != nil { @@ -571,7 +566,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) { ControllerLister: c.controllerLister, ReplicaSetLister: c.replicaSetLister, StatefulSetLister: c.statefulSetLister, - NodeLister: &nodeLister{c.nodeLister}, + NodeLister: c.schedulerCache, PDBLister: c.pdbLister, NodeInfo: c.schedulerCache, PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister}, diff --git a/pkg/scheduler/internal/cache/cache.go b/pkg/scheduler/internal/cache/cache.go index 41b7fe65fcc..952a6124c7c 100644 --- a/pkg/scheduler/internal/cache/cache.go +++ b/pkg/scheduler/internal/cache/cache.go @@ -715,8 +715,24 @@ func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) { n, ok := cache.nodes[nodeName] if !ok { - return nil, fmt.Errorf("error retrieving node '%v' from cache", nodeName) + return nil, fmt.Errorf("node %q not found in cache", nodeName) } return n.info.Node(), nil } + +// ListNodes returns the cached list of nodes. +func (cache *schedulerCache) ListNodes() []*v1.Node { + cache.mu.RLock() + defer cache.mu.RUnlock() + + nodes := make([]*v1.Node, 0, len(cache.nodes)) + for _, node := range cache.nodes { + // Node info is sometimes not removed immediately. See schedulerCache.RemoveNode. + n := node.info.Node() + if n != nil { + nodes = append(nodes, n) + } + } + return nodes +} diff --git a/pkg/scheduler/internal/cache/fake/fake_cache.go b/pkg/scheduler/internal/cache/fake/fake_cache.go index cce3a657735..1ac9ec36d6d 100644 --- a/pkg/scheduler/internal/cache/fake/fake_cache.go +++ b/pkg/scheduler/internal/cache/fake/fake_cache.go @@ -109,3 +109,8 @@ func (c *Cache) NodeTree() *internalcache.NodeTree { return nil } func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) { return nil, nil } + +// ListNodes is a fake method for testing. +func (c *Cache) ListNodes() []*v1.Node { + return nil +} diff --git a/pkg/scheduler/internal/cache/interface.go b/pkg/scheduler/internal/cache/interface.go index e9e35324065..b034bd7148d 100644 --- a/pkg/scheduler/internal/cache/interface.go +++ b/pkg/scheduler/internal/cache/interface.go @@ -17,9 +17,8 @@ limitations under the License. package cache import ( - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) @@ -59,6 +58,9 @@ import ( // - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue, // a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache. type Cache interface { + algorithm.PodLister + algorithm.NodeLister + // AssumePod assumes a pod scheduled and aggregates the pod's information into its node. // The implementation also decides the policy to expire pod before being confirmed (receiving Add event). // After expiration, its information would be subtracted. @@ -113,12 +115,6 @@ type Cache interface { // GetNodeInfo returns the node object with node string. GetNodeInfo(nodeName string) (*v1.Node, error) - // List lists all cached pods (including assumed ones). - List(labels.Selector) ([]*v1.Pod, error) - - // FilteredList returns all cached pods that pass the filter. - FilteredList(filter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error) - // Snapshot takes a snapshot on current cache Snapshot() *Snapshot diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 8c8f76dcf5e..6acf45cc01f 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - v1 "k8s.io/api/core/v1" + "k8s.io/api/core/v1" "k8s.io/api/events/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,7 +39,6 @@ import ( "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" - corelister "k8s.io/client-go/listers/core/v1" clientcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling" @@ -95,14 +94,6 @@ func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { return nil } -type nodeLister struct { - corelister.NodeLister -} - -func (n *nodeLister) List() ([]*v1.Node, error) { - return n.NodeLister.List(labels.Everything()) -} - func podWithID(id, desiredHost string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -270,7 +261,6 @@ func TestScheduler(t *testing.T) { stop := make(chan struct{}) defer close(stop) informerFactory := informers.NewSharedInformerFactory(client, 0) - nl := informerFactory.Core().V1().Nodes().Lister() informerFactory.Start(stop) informerFactory.WaitForCacheSync(stop) @@ -282,18 +272,19 @@ func TestScheduler(t *testing.T) { var gotForgetPod *v1.Pod var gotAssumedPod *v1.Pod var gotBinding *v1.Binding + sCache := &fakecache.Cache{ + ForgetFunc: func(pod *v1.Pod) { + gotForgetPod = pod + }, + AssumeFunc: func(pod *v1.Pod) { + gotAssumedPod = pod + }, + } s := NewFromConfig(&factory.Config{ - SchedulerCache: &fakecache.Cache{ - ForgetFunc: func(pod *v1.Pod) { - gotForgetPod = pod - }, - AssumeFunc: func(pod *v1.Pod) { - gotAssumedPod = pod - }, - }, - NodeLister: &nodeLister{nl}, - Algorithm: item.algo, + SchedulerCache: sCache, + NodeLister: sCache, + Algorithm: item.algo, GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { gotBinding = b @@ -669,7 +660,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C config := &factory.Config{ SchedulerCache: scache, - NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()}, + NodeLister: scache, Algorithm: algo, GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { @@ -722,7 +713,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc sched := NewFromConfig(&factory.Config{ SchedulerCache: scache, - NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()}, + NodeLister: scache, Algorithm: algo, GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { diff --git a/pkg/scheduler/testing/fake_lister.go b/pkg/scheduler/testing/fake_lister.go index 343800a388e..3c0c5bdc882 100644 --- a/pkg/scheduler/testing/fake_lister.go +++ b/pkg/scheduler/testing/fake_lister.go @@ -33,9 +33,9 @@ var _ algorithm.NodeLister = &FakeNodeLister{} // FakeNodeLister implements NodeLister on a []string for test purposes. type FakeNodeLister []*v1.Node -// List returns nodes as a []string. -func (f FakeNodeLister) List() ([]*v1.Node, error) { - return f, nil +// ListNodes returns nodes as a []*v1.Node. +func (f FakeNodeLister) ListNodes() []*v1.Node { + return f } var _ algorithm.PodLister = &FakePodLister{}