sched: schedfwk: init indexers with non-nil map

Using a nil map to initialize the pod indexers will
cause runtime failure when trying to add indexers
in scheduler plugin.
We use a empty map to enable scheduler plugins
to add their indexers.

Signed-off-by: Francesco Romani <fromani@redhat.com>
This commit is contained in:
Francesco Romani 2022-06-20 14:39:41 +02:00
parent 5d7fdf1f12
commit b4e015bf3c
2 changed files with 122 additions and 1 deletions

View File

@ -452,10 +452,11 @@ func unionedGVKs(m map[framework.ClusterEvent]sets.String) map[framework.GVK]fra
}
// newPodInformer creates a shared index informer that returns only non-terminal pods.
// The PodInformer allows indexers to be added, but note that only non-conflict indexers are allowed.
func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}
return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, nil, tweakListOptions)
return coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions)
}

View File

@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
@ -472,3 +473,122 @@ func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue intern
return s, fwk, nil
}
func TestInitPluginsWithIndexers(t *testing.T) {
tests := []struct {
name string
// the plugin registration ordering must not matter, being map traversal random
entrypoints map[string]frameworkruntime.PluginFactory
wantErr string
}{
{
name: "register indexer, no conflicts",
entrypoints: map[string]frameworkruntime.PluginFactory{
"AddIndexer": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
"nodeName": indexByPodSpecNodeName,
})
return &TestPlugin{name: "AddIndexer"}, err
},
},
},
{
name: "register the same indexer name multiple times, conflict",
// order of registration doesn't matter
entrypoints: map[string]frameworkruntime.PluginFactory{
"AddIndexer1": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
"nodeName": indexByPodSpecNodeName,
})
return &TestPlugin{name: "AddIndexer1"}, err
},
"AddIndexer2": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
"nodeName": indexByPodAnnotationNodeName,
})
return &TestPlugin{name: "AddIndexer1"}, err
},
},
wantErr: "indexer conflict",
},
{
name: "register the same indexer body with different names, no conflicts",
// order of registration doesn't matter
entrypoints: map[string]frameworkruntime.PluginFactory{
"AddIndexer1": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
"nodeName1": indexByPodSpecNodeName,
})
return &TestPlugin{name: "AddIndexer1"}, err
},
"AddIndexer2": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
"nodeName2": indexByPodAnnotationNodeName,
})
return &TestPlugin{name: "AddIndexer2"}, err
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fakeInformerFactory := NewInformerFactory(&fake.Clientset{}, 0*time.Second)
var registerPluginFuncs []st.RegisterPluginFunc
for name, entrypoint := range tt.entrypoints {
registerPluginFuncs = append(registerPluginFuncs,
// anything supported by TestPlugin is fine
st.RegisterFilterPlugin(name, entrypoint),
)
}
// we always need this
registerPluginFuncs = append(registerPluginFuncs,
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
_, err := st.NewFramework(registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory))
if len(tt.wantErr) > 0 {
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf("got error %q, want %q", err, tt.wantErr)
}
return
}
if err != nil {
t.Fatalf("Failed to create scheduler: %v", err)
}
})
}
}
func indexByPodSpecNodeName(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
}
func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Annotations) == 0 {
return []string{}, nil
}
nodeName, ok := pod.Annotations["node-name"]
if !ok {
return []string{}, nil
}
return []string{nodeName}, nil
}