diff --git a/pkg/scheduler/BUILD b/pkg/scheduler/BUILD index 7880deda529..22585cbad75 100644 --- a/pkg/scheduler/BUILD +++ b/pkg/scheduler/BUILD @@ -54,12 +54,12 @@ go_test( "//pkg/scheduler/factory:go_default_library", "//pkg/scheduler/internal/cache:go_default_library", "//pkg/scheduler/internal/cache/fake:go_default_library", - "//pkg/scheduler/testing:go_default_library", "//pkg/scheduler/volumebinder:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", @@ -67,6 +67,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", ], diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 79219115fed..699418ac840 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/sets" @@ -34,6 +35,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + corelister "k8s.io/client-go/listers/core/v1" clientcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -47,7 +49,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler/factory" schedulerinternalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake" - schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" "k8s.io/kubernetes/pkg/scheduler/volumebinder" ) @@ -81,12 +82,20 @@ func (fp fakePodPreemptor) RemoveNominatedNodeName(pod *v1.Pod) error { return nil } +type nodeLister struct { + corelister.NodeLister +} + +func (n *nodeLister) List() ([]*v1.Node, error) { + return n.NodeLister.List(labels.Everything()) +} + func podWithID(id, desiredHost string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: id, UID: types.UID(id), - SelfLink: schedulertesting.Test.SelfLink(string(v1.ResourcePods), id), + SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id), }, Spec: v1.PodSpec{ NodeName: desiredHost, @@ -100,8 +109,8 @@ func deletingPod(id string) *v1.Pod { ObjectMeta: metav1.ObjectMeta{ Name: id, UID: types.UID(id), - SelfLink: schedulertesting.Test.SelfLink(string(v1.ResourcePods), id), DeletionTimestamp: &deletionTimestamp, + SelfLink: fmt.Sprintf("/api/v1/%s/%s", string(v1.ResourcePods), id), }, Spec: v1.PodSpec{ NodeName: "", @@ -239,6 +248,15 @@ func TestScheduler(t *testing.T) { }, } + stop := make(chan struct{}) + defer close(stop) + client := clientsetfake.NewSimpleClientset(&testNode) + informerFactory := informers.NewSharedInformerFactory(client, 0) + nl := informerFactory.Core().V1().Nodes().Lister() + + informerFactory.Start(stop) + informerFactory.WaitForCacheSync(stop) + for _, item := range table { t.Run(item.name, func(t *testing.T) { var gotError error @@ -256,10 +274,8 @@ func TestScheduler(t *testing.T) { gotAssumedPod = pod }, }, - NodeLister: schedulertesting.FakeNodeLister( - []*v1.Node{&testNode}, - ), - Algorithm: item.algo, + NodeLister: &nodeLister{nl}, + Algorithm: item.algo, GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { gotBinding = b @@ -317,9 +333,10 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { pod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) - nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node}) + client := clientsetfake.NewSimpleClientset(&node) + informerFactory := informers.NewSharedInformerFactory(client, 0) predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} - scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, pod, &node) + scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, pod, &node) waitPodExpireChan := make(chan struct{}) timeout := make(chan struct{}) @@ -375,9 +392,10 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { firstPod := podWithPort("pod.Name", "", 8080) node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) - nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node}) + client := clientsetfake.NewSimpleClientset(&node) + informerFactory := informers.NewSharedInformerFactory(client, 0) predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} - scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, firstPod, &node) + scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, informerFactory, stop, predicateMap, firstPod, &node) // We use conflicted pod ports to incur fit predicate failure. secondPod := podWithPort("bar", "", 8080) @@ -463,11 +481,16 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) { node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} scache.AddNode(&node) - nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&node}) + client := clientsetfake.NewSimpleClientset(&node) + informerFactory := informers.NewSharedInformerFactory(client, 0) predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts} scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry( - queuedPodStore, scache, nodeLister, predicateMap, stop, test.BindingDuration) + queuedPodStore, scache, informerFactory, predicateMap, stop, test.BindingDuration) + + informerFactory.Start(stop) + informerFactory.WaitForCacheSync(stop) + scheduler.Run() queuedPodStore.Add(firstPod) queuedPodStore.Add(conflictPod) @@ -495,9 +518,12 @@ func TestSchedulerErrorWithLongBinding(t *testing.T) { // queuedPodStore: pods queued before processing. // cache: scheduler cache that might contain assumed pods. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, - nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) { + informerFactory informers.SharedInformerFactory, stop chan struct{}, predicateMap map[string]algorithm.FitPredicate, pod *v1.Pod, node *v1.Node) (*Scheduler, chan *v1.Binding, chan error) { - scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil) + scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil) + + informerFactory.Start(stop) + informerFactory.WaitForCacheSync(stop) queuedPodStore.Add(pod) // queuedPodStore: [foo:8080] @@ -540,7 +566,8 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { }) // create several nodes which cannot schedule the above pod - nodes := []*v1.Node{} + var nodes []*v1.Node + var objects []runtime.Object for i := 0; i < 100; i++ { uid := fmt.Sprintf("machine%v", i) node := v1.Node{ @@ -559,8 +586,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { } scache.AddNode(&node) nodes = append(nodes, &node) + objects = append(objects, &node) } - nodeLister := schedulertesting.FakeNodeLister(nodes) + client := clientsetfake.NewSimpleClientset(objects...) + informerFactory := informers.NewSharedInformerFactory(client, 0) predicateMap := map[string]algorithm.FitPredicate{ "PodFitsResources": predicates.PodFitsResources, } @@ -573,7 +602,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { predicates.NewInsufficientResourceError(v1.ResourceMemory, 500, 0, 100), } } - scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, nil) + scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, nil) + + informerFactory.Start(stop) + informerFactory.WaitForCacheSync(stop) queuedPodStore.Add(podWithTooBigResourceRequests) scheduler.scheduleOne() @@ -597,7 +629,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { // queuedPodStore: pods queued before processing. // scache: scheduler cache that might contain assumed pods. -func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { +func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]algorithm.FitPredicate, recorder record.EventRecorder) (*Scheduler, chan *v1.Binding, chan error) { algo := core.NewGenericScheduler( scache, nil, @@ -608,8 +640,8 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern algorithm.EmptyPriorityMetadataProducer, []algorithm.SchedulerExtender{}, nil, - schedulertesting.FakePersistentVolumeClaimLister{}, - schedulertesting.FakePDBLister{}, + informerFactory.Core().V1().PersistentVolumeClaims().Lister(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, false, api.DefaultPercentageOfNodesToScore) @@ -618,7 +650,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern configurator := &FakeConfigurator{ Config: &factory.Config{ SchedulerCache: scache, - NodeLister: nodeLister, + NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()}, Algorithm: algo, GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { @@ -648,7 +680,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulerintern return sched, bindingChan, errChan } -func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, nodeLister schedulertesting.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { +func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulerinternalcache.Cache, informerFactory informers.SharedInformerFactory, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) { algo := core.NewGenericScheduler( scache, nil, @@ -659,8 +691,8 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc algorithm.EmptyPriorityMetadataProducer, []algorithm.SchedulerExtender{}, nil, - schedulertesting.FakePersistentVolumeClaimLister{}, - schedulertesting.FakePDBLister{}, + informerFactory.Core().V1().PersistentVolumeClaims().Lister(), + informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(), false, false, api.DefaultPercentageOfNodesToScore) @@ -668,7 +700,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc configurator := &FakeConfigurator{ Config: &factory.Config{ SchedulerCache: scache, - NodeLister: nodeLister, + NodeLister: &nodeLister{informerFactory.Core().V1().Nodes().Lister()}, Algorithm: algo, GetBinder: func(pod *v1.Pod) factory.Binder { return fakeBinder{func(b *v1.Binding) error { @@ -701,18 +733,21 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBinder, stop <-chan struct{}, broadcaster record.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) { testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}} - nodeLister := schedulertesting.FakeNodeLister([]*v1.Node{&testNode}) queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc) queuedPodStore.Add(podWithID("foo", "")) scache := schedulerinternalcache.New(10*time.Minute, stop) scache.AddNode(&testNode) + client := clientsetfake.NewSimpleClientset(&testNode) + informerFactory := informers.NewSharedInformerFactory(client, 0) predicateMap := map[string]algorithm.FitPredicate{ predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder), } recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}) - s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap, recorder) + s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder) + informerFactory.Start(stop) + informerFactory.WaitForCacheSync(stop) s.config.VolumeBinder = fakeVolumeBinder return s, bindingChan, errChan }