From b4e015bf3c131aea7ba1f02bd5be957e8ed89dcc Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Mon, 20 Jun 2022 14:39:41 +0200 Subject: [PATCH] sched: schedfwk: init indexers with non-nil map Using a nil map to initialize the pod indexers will cause runtime failure when trying to add indexers in scheduler plugin. We use a empty map to enable scheduler plugins to add their indexers. Signed-off-by: Francesco Romani --- pkg/scheduler/scheduler.go | 3 +- pkg/scheduler/scheduler_test.go | 120 ++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b1558beef27..61ab633330a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -452,10 +452,11 @@ func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]fra } // newPodInformer creates a shared index informer that returns only non-terminal pods. +// The PodInformer allows indexers to be added, but note that only non-conflict indexers are allowed. func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed) tweakListOptions := func(options *metav1.ListOptions) { options.FieldSelector = selector } - return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, nil, tweakListOptions) + return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 28ab7d4a492..839175488b8 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -28,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -472,3 +473,122 @@ func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue intern return s, fwk, nil } + +func TestInitPluginsWithIndexers(t *testing.T) { + tests := []struct { + name string + // the plugin registration ordering must not matter, being map traversal random + entrypoints map[string]frameworkruntime.PluginFactory + wantErr string + }{ + { + name: "register indexer, no conflicts", + entrypoints: map[string]frameworkruntime.PluginFactory{ + "AddIndexer": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + podInformer := handle.SharedInformerFactory().Core().V1().Pods() + err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ + "nodeName": indexByPodSpecNodeName, + }) + return &TestPlugin{name: "AddIndexer"}, err + }, + }, + }, + { + name: "register the same indexer name multiple times, conflict", + // order of registration doesn't matter + entrypoints: map[string]frameworkruntime.PluginFactory{ + "AddIndexer1": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + podInformer := handle.SharedInformerFactory().Core().V1().Pods() + err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ + "nodeName": indexByPodSpecNodeName, + }) + return &TestPlugin{name: "AddIndexer1"}, err + }, + "AddIndexer2": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + podInformer := handle.SharedInformerFactory().Core().V1().Pods() + err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ + "nodeName": indexByPodAnnotationNodeName, + }) + return &TestPlugin{name: "AddIndexer1"}, err + }, + }, + wantErr: "indexer conflict", + }, + { + name: "register the same indexer body with different names, no conflicts", + // order of registration doesn't matter + entrypoints: map[string]frameworkruntime.PluginFactory{ + "AddIndexer1": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + podInformer := handle.SharedInformerFactory().Core().V1().Pods() + err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ + "nodeName1": indexByPodSpecNodeName, + }) + return &TestPlugin{name: "AddIndexer1"}, err + }, + "AddIndexer2": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + podInformer := handle.SharedInformerFactory().Core().V1().Pods() + err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ + "nodeName2": indexByPodAnnotationNodeName, + }) + return &TestPlugin{name: "AddIndexer2"}, err + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeInformerFactory := NewInformerFactory(&fake.Clientset{}, 0*time.Second) + + var registerPluginFuncs []st.RegisterPluginFunc + for name, entrypoint := range tt.entrypoints { + registerPluginFuncs = append(registerPluginFuncs, + // anything supported by TestPlugin is fine + st.RegisterFilterPlugin(name, entrypoint), + ) + } + // we always need this + registerPluginFuncs = append(registerPluginFuncs, + st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + _, err := st.NewFramework(registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory)) + + if len(tt.wantErr) > 0 { + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("got error %q, want %q", err, tt.wantErr) + } + return + } + if err != nil { + t.Fatalf("Failed to create scheduler: %v", err) + } + }) + } +} + +func indexByPodSpecNodeName(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil + } + if len(pod.Spec.NodeName) == 0 { + return []string{}, nil + } + return []string{pod.Spec.NodeName}, nil +} + +func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil + } + if len(pod.Annotations) == 0 { + return []string{}, nil + } + nodeName, ok := pod.Annotations["node-name"] + if !ok { + return []string{}, nil + } + return []string{nodeName}, nil +}