Use scheduler cache in affinity priority functions

Make the cache implement NodeLister and expose it to the priority
functions. This way, the priority functions make use of a single cache,
the scheduler's, instead of mixing it with the lister's caches.

Signed-off-by: Aldo Culquicondor <acondor@google.com>
This commit is contained in:
Aldo Culquicondor 2019-07-16 10:51:39 -04:00
parent ac2c1ce08a
commit f58abdf966
9 changed files with 53 additions and 54 deletions

View File

@ -79,7 +79,6 @@ go_test(
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/events:go_default_library",
],

View File

@ -34,7 +34,7 @@ var NodeFieldSelectorKeys = map[string]func(*v1.Node) string{
type NodeLister interface {
// We explicitly return []*v1.Node, instead of v1.NodeList, to avoid
// performing expensive copies that are unneeded.
List() ([]*v1.Node, error)
ListNodes() []*v1.Node
}
// PodFilter is a function to filter a pod. If pod passed return true else return false.

View File

@ -195,7 +195,7 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
return result, prefilterStatus.AsError()
}
nodes, err := nodeLister.List()
nodes := nodeLister.ListNodes()
if err != nil {
return result, err
}
@ -324,10 +324,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister,
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, nil, err
}
allNodes := nodeLister.ListNodes()
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}

View File

@ -149,10 +149,12 @@ type Configurator interface {
GetPredicateMetadataProducer() (predicates.PredicateMetadataProducer, error)
GetPredicates(predicateKeys sets.String) (map[string]predicates.FitPredicate, error)
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetClient() clientset.Interface
// TODO(#80216): Remove these methods from the interface.
// Needs to be exposed for things like integration tests where we want to make fake nodes.
GetNodeLister() corelisters.NodeLister
// Exposed for testing
GetScheduledPodLister() corelisters.PodLister
@ -163,6 +165,7 @@ type Configurator interface {
}
// configFactory is the default implementation of the scheduler.Configurator interface.
// TODO(#80216): Remove pod and node listers.
type configFactory struct {
client clientset.Interface
// a means to list all known scheduled pods.
@ -486,7 +489,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
return &Config{
SchedulerCache: c.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: &nodeLister{c.nodeLister},
NodeLister: c.schedulerCache,
Algorithm: algo,
GetBinder: getBinderFunc(c.client, extenders),
PodConditionUpdater: &podConditionUpdater{c.client},
@ -521,14 +524,6 @@ func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerEx
}
}
type nodeLister struct {
corelisters.NodeLister
}
func (n *nodeLister) List() ([]*v1.Node, error) {
return n.NodeLister.List(labels.Everything())
}
func (c *configFactory) GetPriorityFunctionConfigs(priorityKeys sets.String) ([]priorities.PriorityConfig, error) {
pluginArgs, err := c.getPluginArgs()
if err != nil {
@ -571,7 +566,7 @@ func (c *configFactory) getPluginArgs() (*PluginFactoryArgs, error) {
ControllerLister: c.controllerLister,
ReplicaSetLister: c.replicaSetLister,
StatefulSetLister: c.statefulSetLister,
NodeLister: &nodeLister{c.nodeLister},
NodeLister: c.schedulerCache,
PDBLister: c.pdbLister,
NodeInfo: c.schedulerCache,
PVInfo: &predicates.CachedPersistentVolumeInfo{PersistentVolumeLister: c.pVLister},

View File

@ -715,8 +715,24 @@ func (cache *schedulerCache) GetNodeInfo(nodeName string) (*v1.Node, error) {
n, ok := cache.nodes[nodeName]
if !ok {
return nil, fmt.Errorf("error retrieving node '%v' from cache", nodeName)
return nil, fmt.Errorf("node %q not found in cache", nodeName)
}
return n.info.Node(), nil
}
// ListNodes returns the cached list of nodes.
func (cache *schedulerCache) ListNodes() []*v1.Node {
cache.mu.RLock()
defer cache.mu.RUnlock()
nodes := make([]*v1.Node, 0, len(cache.nodes))
for _, node := range cache.nodes {
// Node info is sometimes not removed immediately. See schedulerCache.RemoveNode.
n := node.info.Node()
if n != nil {
nodes = append(nodes, n)
}
}
return nodes
}

View File

@ -109,3 +109,8 @@ func (c *Cache) NodeTree() *internalcache.NodeTree { return nil }
func (c *Cache) GetNodeInfo(nodeName string) (*v1.Node, error) {
return nil, nil
}
// ListNodes is a fake method for testing.
func (c *Cache) ListNodes() []*v1.Node {
return nil
}

View File

@ -17,9 +17,8 @@ limitations under the License.
package cache
import (
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
storagev1beta1 "k8s.io/api/storage/v1beta1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -59,6 +58,9 @@ import (
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
type Cache interface {
algorithm.PodLister
algorithm.NodeLister
// AssumePod assumes a pod scheduled and aggregates the pod's information into its node.
// The implementation also decides the policy to expire pod before being confirmed (receiving Add event).
// After expiration, its information would be subtracted.
@ -113,12 +115,6 @@ type Cache interface {
// GetNodeInfo returns the node object with node string.
GetNodeInfo(nodeName string) (*v1.Node, error)
// List lists all cached pods (including assumed ones).
List(labels.Selector) ([]*v1.Pod, error)
// FilteredList returns all cached pods that pass the filter.
FilteredList(filter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error)
// Snapshot takes a snapshot on current cache
Snapshot() *Snapshot

View File

@ -26,7 +26,7 @@ import (
"testing"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -39,7 +39,6 @@ import (
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
corelister "k8s.io/client-go/listers/core/v1"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
@ -95,14 +94,6 @@ func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error {
return nil
}
type nodeLister struct {
corelister.NodeLister
}
func (n *nodeLister) List() ([]*v1.Node, error) {
return n.NodeLister.List(labels.Everything())
}
func podWithID(id, desiredHost string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@ -270,7 +261,6 @@ func TestScheduler(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
informerFactory := informers.NewSharedInformerFactory(client, 0)
nl := informerFactory.Core().V1().Nodes().Lister()
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@ -282,18 +272,19 @@ func TestScheduler(t *testing.T) {
var gotForgetPod *v1.Pod
var gotAssumedPod *v1.Pod
var gotBinding *v1.Binding
sCache := &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod
},
AssumeFunc: func(pod *v1.Pod) {
gotAssumedPod = pod
},
}
s := NewFromConfig(&factory.Config{
SchedulerCache: &fakecache.Cache{
ForgetFunc: func(pod *v1.Pod) {
gotForgetPod = pod
},
AssumeFunc: func(pod *v1.Pod) {
gotAssumedPod = pod
},
},
NodeLister: &nodeLister{nl},
Algorithm: item.algo,
SchedulerCache: sCache,
NodeLister: sCache,
Algorithm: item.algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
gotBinding = b
@ -669,7 +660,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
config := &factory.Config{
SchedulerCache: scache,
NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
NodeLister: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {
@ -722,7 +713,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
sched := NewFromConfig(&factory.Config{
SchedulerCache: scache,
NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()},
NodeLister: scache,
Algorithm: algo,
GetBinder: func(pod *v1.Pod) factory.Binder {
return fakeBinder{func(b *v1.Binding) error {

View File

@ -33,9 +33,9 @@ var _ algorithm.NodeLister = &FakeNodeLister{}
// FakeNodeLister implements NodeLister on a []string for test purposes.
type FakeNodeLister []*v1.Node
// List returns nodes as a []string.
func (f FakeNodeLister) List() ([]*v1.Node, error) {
return f, nil
// ListNodes returns nodes as a []*v1.Node.
func (f FakeNodeLister) ListNodes() []*v1.Node {
return f
}
var _ algorithm.PodLister = &FakePodLister{}