diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index aea3bba428d..85cbdb8741d 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -444,7 +444,6 @@ func addAllEventHandlers( dynInformer.AddEventHandler( buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)), ) - go dynInformer.Run(sched.StopEverything) } } } diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index d112103228d..31bf9b666c8 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -23,9 +23,18 @@ import ( "time" "github.com/google/go-cmp/cmp" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" "k8s.io/api/core/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + dyfake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/internal/cache" "k8s.io/kubernetes/pkg/scheduler/internal/queue" @@ -334,3 +343,103 @@ func TestPreCheckForNode(t *testing.T) { }) } } + +// test for informers of resources we care about is registered +func TestAddAllEventHandlers(t *testing.T) { + tests := []struct { + name string + gvkMap map[framework.GVK]framework.ActionType + expectStaticInformers map[reflect.Type]bool + expectDynamicInformers map[schema.GroupVersionResource]bool + }{ + { + name: "default handlers in framework", + gvkMap: map[framework.GVK]framework.ActionType{}, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, + { + name: "add GVKs handlers defined in framework dynamically", + gvkMap: map[framework.GVK]framework.ActionType{ + "Pod": framework.Add | framework.Delete, + "PersistentVolume": framework.Delete, + "storage.k8s.io/CSIStorageCapacity": framework.Update, + }, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + reflect.TypeOf(&v1.PersistentVolume{}): true, + reflect.TypeOf(&storagev1beta1.CSIStorageCapacity{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{}, + }, + { + name: "add GVKs handlers defined in plugins dynamically", + gvkMap: map[framework.GVK]framework.ActionType{ + "daemonsets.v1.apps": framework.Add | framework.Delete, + "cronjobs.v1.batch": framework.Delete, + }, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "daemonsets"}: true, + {Group: "batch", Version: "v1", Resource: "cronjobs"}: true, + }, + }, + { + name: "add GVKs handlers defined in plugins dynamically, with one illegal GVK form", + gvkMap: map[framework.GVK]framework.ActionType{ + "daemonsets.v1.apps": framework.Add | framework.Delete, + "custommetrics.v1beta1": framework.Update, + }, + expectStaticInformers: map[reflect.Type]bool{ + reflect.TypeOf(&v1.Pod{}): true, + reflect.TypeOf(&v1.Node{}): true, + }, + expectDynamicInformers: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "daemonsets"}: true, + }, + }, + } + + scheme := runtime.NewScheme() + var localSchemeBuilder = runtime.SchemeBuilder{ + appsv1.AddToScheme, + batchv1.AddToScheme, + } + localSchemeBuilder.AddToScheme(scheme) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + testSched := Scheduler{ + StopEverything: stopCh, + SchedulingQueue: queue.NewTestQueue(context.Background(), nil), + } + + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + dynclient := dyfake.NewSimpleDynamicClient(scheme) + dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) + + addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap) + + informerFactory.Start(testSched.StopEverything) + dynInformerFactory.Start(testSched.StopEverything) + staticInformers := informerFactory.WaitForCacheSync(testSched.StopEverything) + dynamicInformers := dynInformerFactory.WaitForCacheSync(testSched.StopEverything) + + if diff := cmp.Diff(tt.expectStaticInformers, staticInformers); diff != "" { + t.Errorf("Unexpected diff (-want, +got):\n%s", diff) + } + if diff := cmp.Diff(tt.expectDynamicInformers, dynamicInformers); diff != "" { + t.Errorf("Unexpected diff (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 7e8b0e20fdf..16841f84bd2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -316,6 +316,11 @@ func New(client clientset.Interface, } addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap)) + + if dynInformerFactory != nil { + dynInformerFactory.Start(stopEverything) + } + return sched, nil }