reuse InformerFactory in scheduler tests (#107835)

* reuse informer in scheduler tests

Signed-off-by: kerthcet <kerthcet@gmail.com>

* reduce construct two informers

Signed-off-by: kerthcet <kerthcet@gmail.com>

* instantiate formerfacotry error

Signed-off-by: kerthcet <kerthcet@gmail.com>
This commit is contained in:
Kante 2022-02-10 08:53:58 +08:00 committed by GitHub
parent 0dcd6eaa0d
commit 62eb70c1b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 41 deletions

View File

@ -373,6 +373,7 @@ func TestAddAllEventHandlers(t *testing.T) {
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
@ -386,6 +387,7 @@ func TestAddAllEventHandlers(t *testing.T) {
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
reflect.TypeOf(&v1.PersistentVolume{}): true,
reflect.TypeOf(&storagev1beta1.CSIStorageCapacity{}): true,
},
@ -400,6 +402,7 @@ func TestAddAllEventHandlers(t *testing.T) {
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "daemonsets"}: true,
@ -415,6 +418,7 @@ func TestAddAllEventHandlers(t *testing.T) {
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "daemonsets"}: true,
@ -433,13 +437,14 @@ func TestAddAllEventHandlers(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
schedulingQueue := queue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
testSched := Scheduler{
StopEverything: ctx.Done(),
SchedulingQueue: queue.NewTestQueue(ctx, nil),
SchedulingQueue: schedulingQueue,
}
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)

View File

@ -25,6 +25,11 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
)
// NewTestQueue creates a priority queue with an empty informer factory.
func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue {
return NewTestQueueWithObjects(ctx, lessFn, nil, opts...)
}
// NewTestQueueWithObjects creates a priority queue with an informer factory
// populated with the provided objects.
func NewTestQueueWithObjects(
@ -34,13 +39,17 @@ func NewTestQueueWithObjects(
opts ...Option,
) *PriorityQueue {
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objs...), 0)
return NewTestQueueWithInformerFactory(ctx, lessFn, informerFactory, opts...)
}
func NewTestQueueWithInformerFactory(
ctx context.Context,
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option,
) *PriorityQueue {
pq := NewPriorityQueue(lessFn, informerFactory, opts...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return pq
}
// NewTestQueue creates a priority queue with an empty informer factory.
func NewTestQueue(ctx context.Context, lessFn framework.LessFunc, opts ...Option) *PriorityQueue {
return NewTestQueueWithObjects(ctx, lessFn, nil, opts...)
}

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
clienttesting "k8s.io/client-go/testing"
@ -390,13 +391,6 @@ func TestSchedulerScheduleOne(t *testing.T) {
},
}
stop := make(chan struct{})
defer close(stop)
informerFactory := informers.NewSharedInformerFactory(client, 0)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
for _, item := range table {
t.Run(item.name, func(t *testing.T) {
var gotError error
@ -440,6 +434,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := &Scheduler{
SchedulerCache: sCache,
Algorithm: item.algo,
@ -647,15 +642,13 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
pod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, pod, &node, fns...)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, pod, &node, fns...)
waitPodExpireChan := make(chan struct{})
timeout := make(chan struct{})
@ -714,14 +707,12 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
firstPod := podWithPort("pod.Name", "", 8080)
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "machine1", UID: types.UID("machine1")}}
scache.AddNode(&node)
client := clientsetfake.NewSimpleClientset(&node)
informerFactory := informers.NewSharedInformerFactory(client, 0)
fns := []st.RegisterPluginFunc{
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
}
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, informerFactory, firstPod, &node, fns...)
scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, firstPod, &node, fns...)
// We use conflicted pod ports to incur fit predicate failure.
secondPod := podWithPort("bar", "", 8080)
@ -780,12 +771,8 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
informerFactory informers.SharedInformerFactory, pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
pod *v1.Pod, node *v1.Node, fns ...st.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
scheduler, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, nil, nil, fns...)
queuedPodStore.Add(pod)
// queuedPodStore: [foo:8080]
@ -850,8 +837,6 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
nodes = append(nodes, &node)
objects = append(objects, &node)
}
client := clientsetfake.NewSimpleClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(client, 0)
// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
failedNodeStatues := framework.NodeToStatusMap{}
@ -867,10 +852,9 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
st.RegisterPluginAsExtensions(noderesources.Name, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit), "Filter", "PreFilter"),
}
scheduler, _, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(objects...), 0)
scheduler, _, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, nil, fns...)
queuedPodStore.Add(podWithTooBigResourceRequests)
scheduler.scheduleOne(ctx)
@ -916,6 +900,11 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s
recorder = &events.FakeRecorder{}
}
if informerFactory == nil {
informerFactory = informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)
}
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
fwk, _ := st.NewFramework(
fns,
testSchedulerName,
@ -945,7 +934,7 @@ func setupTestScheduler(ctx context.Context, queuedPodStore *clientcache.FIFO, s
testSchedulerName: fwk,
},
client: client,
SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
SchedulingQueue: schedulingQueue,
}
return sched, bindingChan, errChan
@ -975,8 +964,6 @@ func setupTestSchedulerWithVolumeBinding(ctx context.Context, volumeBinder volum
}, "PreFilter", "Filter", "Reserve", "PreBind"),
}
s, bindingChan, errChan := setupTestScheduler(ctx, queuedPodStore, scache, informerFactory, broadcaster, fns...)
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
return s, bindingChan, errChan
}