mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
636 lines
21 KiB
Go
636 lines
21 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
v1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/fake"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/events"
|
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
|
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
|
|
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
|
testingclock "k8s.io/utils/clock/testing"
|
|
"k8s.io/utils/pointer"
|
|
)
|
|
|
|
func TestSchedulerCreation(t *testing.T) {
|
|
invalidRegistry := map[string]frameworkruntime.PluginFactory{
|
|
defaultbinder.Name: defaultbinder.New,
|
|
}
|
|
validRegistry := map[string]frameworkruntime.PluginFactory{
|
|
"Foo": defaultbinder.New,
|
|
}
|
|
cases := []struct {
|
|
name string
|
|
opts []Option
|
|
wantErr string
|
|
wantProfiles []string
|
|
wantExtenders []string
|
|
}{
|
|
{
|
|
name: "valid out-of-tree registry",
|
|
opts: []Option{
|
|
WithFrameworkOutOfTreeRegistry(validRegistry),
|
|
WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "default-scheduler",
|
|
Plugins: &schedulerapi.Plugins{
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
)},
|
|
wantProfiles: []string{"default-scheduler"},
|
|
},
|
|
{
|
|
name: "repeated plugin name in out-of-tree plugin",
|
|
opts: []Option{
|
|
WithFrameworkOutOfTreeRegistry(invalidRegistry),
|
|
WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "default-scheduler",
|
|
Plugins: &schedulerapi.Plugins{
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
)},
|
|
wantProfiles: []string{"default-scheduler"},
|
|
wantErr: "a plugin named DefaultBinder already exists",
|
|
},
|
|
{
|
|
name: "multiple profiles",
|
|
opts: []Option{
|
|
WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "foo",
|
|
Plugins: &schedulerapi.Plugins{
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "bar",
|
|
Plugins: &schedulerapi.Plugins{
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
)},
|
|
wantProfiles: []string{"bar", "foo"},
|
|
},
|
|
{
|
|
name: "Repeated profiles",
|
|
opts: []Option{
|
|
WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "foo",
|
|
Plugins: &schedulerapi.Plugins{
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "bar",
|
|
Plugins: &schedulerapi.Plugins{
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "foo",
|
|
Plugins: &schedulerapi.Plugins{
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
)},
|
|
wantErr: "duplicate profile with scheduler name \"foo\"",
|
|
},
|
|
{
|
|
name: "With extenders",
|
|
opts: []Option{
|
|
WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "default-scheduler",
|
|
Plugins: &schedulerapi.Plugins{
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
),
|
|
WithExtenders(
|
|
schedulerapi.Extender{
|
|
URLPrefix: "http://extender.kube-system/",
|
|
},
|
|
),
|
|
},
|
|
wantProfiles: []string{"default-scheduler"},
|
|
wantExtenders: []string{"http://extender.kube-system/"},
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
client := fake.NewSimpleClientset()
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
s, err := New(
|
|
client,
|
|
informerFactory,
|
|
nil,
|
|
profile.NewRecorderFactory(eventBroadcaster),
|
|
stopCh,
|
|
tc.opts...,
|
|
)
|
|
|
|
// Errors
|
|
if len(tc.wantErr) != 0 {
|
|
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
|
|
t.Errorf("got error %q, want %q", err, tc.wantErr)
|
|
}
|
|
return
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("Failed to create scheduler: %v", err)
|
|
}
|
|
|
|
// Profiles
|
|
profiles := make([]string, 0, len(s.Profiles))
|
|
for name := range s.Profiles {
|
|
profiles = append(profiles, name)
|
|
}
|
|
sort.Strings(profiles)
|
|
if diff := cmp.Diff(tc.wantProfiles, profiles); diff != "" {
|
|
t.Errorf("unexpected profiles (-want, +got):\n%s", diff)
|
|
}
|
|
|
|
// Extenders
|
|
if len(tc.wantExtenders) != 0 {
|
|
// Scheduler.Extenders
|
|
extenders := make([]string, 0, len(s.Extenders))
|
|
for _, e := range s.Extenders {
|
|
extenders = append(extenders, e.Name())
|
|
}
|
|
if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
|
|
t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
|
|
}
|
|
|
|
// framework.Handle.Extenders()
|
|
for _, p := range s.Profiles {
|
|
extenders := make([]string, 0, len(p.Extenders()))
|
|
for _, e := range p.Extenders() {
|
|
extenders = append(extenders, e.Name())
|
|
}
|
|
if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
|
|
t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFailureHandler(t *testing.T) {
|
|
testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj()
|
|
testPodUpdated := testPod.DeepCopy()
|
|
testPodUpdated.Labels = map[string]string{"foo": ""}
|
|
|
|
tests := []struct {
|
|
name string
|
|
podUpdatedDuringScheduling bool // pod is updated during a scheduling cycle
|
|
podDeletedDuringScheduling bool // pod is deleted during a scheduling cycle
|
|
expect *v1.Pod
|
|
}{
|
|
{
|
|
name: "pod is updated during a scheduling cycle",
|
|
podUpdatedDuringScheduling: true,
|
|
expect: testPodUpdated,
|
|
},
|
|
{
|
|
name: "pod is not updated during a scheduling cycle",
|
|
expect: testPod,
|
|
},
|
|
{
|
|
name: "pod is deleted during a scheduling cycle",
|
|
podDeletedDuringScheduling: true,
|
|
expect: nil,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
podInformer := informerFactory.Core().V1().Pods()
|
|
// Need to add/update/delete testPod to the store.
|
|
podInformer.Informer().GetStore().Add(testPod)
|
|
|
|
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
|
|
schedulerCache := internalcache.New(30*time.Second, ctx.Done())
|
|
|
|
queue.Add(testPod)
|
|
queue.Pop()
|
|
|
|
if tt.podUpdatedDuringScheduling {
|
|
podInformer.Informer().GetStore().Update(testPodUpdated)
|
|
queue.Update(testPod, testPodUpdated)
|
|
}
|
|
if tt.podDeletedDuringScheduling {
|
|
podInformer.Informer().GetStore().Delete(testPod)
|
|
queue.Delete(testPod)
|
|
}
|
|
|
|
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
|
s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable), nil, time.Now())
|
|
|
|
var got *v1.Pod
|
|
if tt.podUpdatedDuringScheduling {
|
|
head, e := queue.Pop()
|
|
if e != nil {
|
|
t.Fatalf("Cannot pop pod from the activeQ: %v", e)
|
|
}
|
|
got = head.Pod
|
|
} else {
|
|
got = getPodFromPriorityQueue(queue, testPod)
|
|
}
|
|
|
|
if diff := cmp.Diff(tt.expect, got); diff != "" {
|
|
t.Errorf("Unexpected pod (-want, +got): %s", diff)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFailureHandler_NodeNotFound(t *testing.T) {
|
|
nodeFoo := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
|
nodeBar := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
|
testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj()
|
|
tests := []struct {
|
|
name string
|
|
nodes []v1.Node
|
|
nodeNameToDelete string
|
|
injectErr error
|
|
expectNodeNames sets.String
|
|
}{
|
|
{
|
|
name: "node is deleted during a scheduling cycle",
|
|
nodes: []v1.Node{*nodeFoo, *nodeBar},
|
|
nodeNameToDelete: "foo",
|
|
injectErr: apierrors.NewNotFound(v1.Resource("node"), nodeFoo.Name),
|
|
expectNodeNames: sets.NewString("bar"),
|
|
},
|
|
{
|
|
name: "node is not deleted but NodeNotFound is received incorrectly",
|
|
nodes: []v1.Node{*nodeFoo, *nodeBar},
|
|
injectErr: apierrors.NewNotFound(v1.Resource("node"), nodeFoo.Name),
|
|
expectNodeNames: sets.NewString("foo", "bar"),
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: tt.nodes})
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
podInformer := informerFactory.Core().V1().Pods()
|
|
// Need to add testPod to the store.
|
|
podInformer.Informer().GetStore().Add(testPod)
|
|
|
|
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
|
|
schedulerCache := internalcache.New(30*time.Second, ctx.Done())
|
|
|
|
for i := range tt.nodes {
|
|
node := tt.nodes[i]
|
|
// Add node to schedulerCache no matter it's deleted in API server or not.
|
|
schedulerCache.AddNode(&node)
|
|
if node.Name == tt.nodeNameToDelete {
|
|
client.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{})
|
|
}
|
|
}
|
|
|
|
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
|
s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(tt.injectErr), nil, time.Now())
|
|
|
|
gotNodes := schedulerCache.Dump().Nodes
|
|
gotNodeNames := sets.NewString()
|
|
for _, nodeInfo := range gotNodes {
|
|
gotNodeNames.Insert(nodeInfo.Node().Name)
|
|
}
|
|
if diff := cmp.Diff(tt.expectNodeNames, gotNodeNames); diff != "" {
|
|
t.Errorf("Unexpected nodes (-want, +got): %s", diff)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFailureHandler_PodAlreadyBound(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
nodeFoo := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
|
testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Node("foo").Obj()
|
|
|
|
client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{nodeFoo}})
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
podInformer := informerFactory.Core().V1().Pods()
|
|
// Need to add testPod to the store.
|
|
podInformer.Informer().GetStore().Add(testPod)
|
|
|
|
queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
|
|
schedulerCache := internalcache.New(30*time.Second, ctx.Done())
|
|
|
|
// Add node to schedulerCache no matter it's deleted in API server or not.
|
|
schedulerCache.AddNode(&nodeFoo)
|
|
|
|
s, fwk, err := initScheduler(ctx.Done(), schedulerCache, queue, client, informerFactory)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
|
|
s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(fmt.Errorf("binding rejected: timeout")), nil, time.Now())
|
|
|
|
pod := getPodFromPriorityQueue(queue, testPod)
|
|
if pod != nil {
|
|
t.Fatalf("Unexpected pod: %v should not be in PriorityQueue when the NodeName of pod is not empty", pod.Name)
|
|
}
|
|
}
|
|
|
|
// TestWithPercentageOfNodesToScore tests scheduler's PercentageOfNodesToScore is set correctly.
|
|
func TestWithPercentageOfNodesToScore(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
percentageOfNodesToScoreConfig *int32
|
|
wantedPercentageOfNodesToScore int32
|
|
}{
|
|
{
|
|
name: "percentageOfNodesScore is nil",
|
|
percentageOfNodesToScoreConfig: nil,
|
|
wantedPercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
|
},
|
|
{
|
|
name: "percentageOfNodesScore is not nil",
|
|
percentageOfNodesToScoreConfig: pointer.Int32(10),
|
|
wantedPercentageOfNodesToScore: 10,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
client := fake.NewSimpleClientset()
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
sched, err := New(
|
|
client,
|
|
informerFactory,
|
|
nil,
|
|
profile.NewRecorderFactory(eventBroadcaster),
|
|
stopCh,
|
|
WithPercentageOfNodesToScore(tt.percentageOfNodesToScoreConfig),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create scheduler: %v", err)
|
|
}
|
|
if sched.percentageOfNodesToScore != tt.wantedPercentageOfNodesToScore {
|
|
t.Errorf("scheduler.percercentageOfNodesToScore = %v, want %v", sched.percentageOfNodesToScore, tt.wantedPercentageOfNodesToScore)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// getPodFromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
|
|
// the specific pod from the given priority queue. It returns the found pod in the priority queue.
|
|
func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
|
|
podList, _ := queue.PendingPods()
|
|
if len(podList) == 0 {
|
|
return nil
|
|
}
|
|
|
|
queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
for _, foundPod := range podList {
|
|
foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
if foundPodKey == queryPodKey {
|
|
return foundPod
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func initScheduler(stop <-chan struct{}, cache internalcache.Cache, queue internalqueue.SchedulingQueue,
|
|
client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) {
|
|
registerPluginFuncs := []st.RegisterPluginFunc{
|
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
}
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
fwk, err := st.NewFramework(registerPluginFuncs,
|
|
testSchedulerName,
|
|
stop,
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithInformerFactory(informerFactory),
|
|
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
s := &Scheduler{
|
|
Cache: cache,
|
|
client: client,
|
|
StopEverything: stop,
|
|
SchedulingQueue: queue,
|
|
Profiles: profile.Map{testSchedulerName: fwk},
|
|
}
|
|
s.applyDefaultHandlers()
|
|
|
|
return s, fwk, nil
|
|
}
|
|
|
|
func TestInitPluginsWithIndexers(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
// the plugin registration ordering must not matter, being map traversal random
|
|
entrypoints map[string]frameworkruntime.PluginFactory
|
|
wantErr string
|
|
}{
|
|
{
|
|
name: "register indexer, no conflicts",
|
|
entrypoints: map[string]frameworkruntime.PluginFactory{
|
|
"AddIndexer": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
|
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
|
|
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
|
"nodeName": indexByPodSpecNodeName,
|
|
})
|
|
return &TestPlugin{name: "AddIndexer"}, err
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "register the same indexer name multiple times, conflict",
|
|
// order of registration doesn't matter
|
|
entrypoints: map[string]frameworkruntime.PluginFactory{
|
|
"AddIndexer1": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
|
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
|
|
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
|
"nodeName": indexByPodSpecNodeName,
|
|
})
|
|
return &TestPlugin{name: "AddIndexer1"}, err
|
|
},
|
|
"AddIndexer2": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
|
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
|
|
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
|
"nodeName": indexByPodAnnotationNodeName,
|
|
})
|
|
return &TestPlugin{name: "AddIndexer1"}, err
|
|
},
|
|
},
|
|
wantErr: "indexer conflict",
|
|
},
|
|
{
|
|
name: "register the same indexer body with different names, no conflicts",
|
|
// order of registration doesn't matter
|
|
entrypoints: map[string]frameworkruntime.PluginFactory{
|
|
"AddIndexer1": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
|
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
|
|
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
|
"nodeName1": indexByPodSpecNodeName,
|
|
})
|
|
return &TestPlugin{name: "AddIndexer1"}, err
|
|
},
|
|
"AddIndexer2": func(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
|
|
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
|
|
err := podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
|
|
"nodeName2": indexByPodAnnotationNodeName,
|
|
})
|
|
return &TestPlugin{name: "AddIndexer2"}, err
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
fakeInformerFactory := NewInformerFactory(&fake.Clientset{}, 0*time.Second)
|
|
|
|
var registerPluginFuncs []st.RegisterPluginFunc
|
|
for name, entrypoint := range tt.entrypoints {
|
|
registerPluginFuncs = append(registerPluginFuncs,
|
|
// anything supported by TestPlugin is fine
|
|
st.RegisterFilterPlugin(name, entrypoint),
|
|
)
|
|
}
|
|
// we always need this
|
|
registerPluginFuncs = append(registerPluginFuncs,
|
|
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
)
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
_, err := st.NewFramework(registerPluginFuncs, "test", stopCh, frameworkruntime.WithInformerFactory(fakeInformerFactory))
|
|
|
|
if len(tt.wantErr) > 0 {
|
|
if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
|
|
t.Errorf("got error %q, want %q", err, tt.wantErr)
|
|
}
|
|
return
|
|
}
|
|
if err != nil {
|
|
t.Fatalf("Failed to create scheduler: %v", err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func indexByPodSpecNodeName(obj interface{}) ([]string, error) {
|
|
pod, ok := obj.(*v1.Pod)
|
|
if !ok {
|
|
return []string{}, nil
|
|
}
|
|
if len(pod.Spec.NodeName) == 0 {
|
|
return []string{}, nil
|
|
}
|
|
return []string{pod.Spec.NodeName}, nil
|
|
}
|
|
|
|
func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) {
|
|
pod, ok := obj.(*v1.Pod)
|
|
if !ok {
|
|
return []string{}, nil
|
|
}
|
|
if len(pod.Annotations) == 0 {
|
|
return []string{}, nil
|
|
}
|
|
nodeName, ok := pod.Annotations["node-name"]
|
|
if !ok {
|
|
return []string{}, nil
|
|
}
|
|
return []string{nodeName}, nil
|
|
}
|