diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index e39243f91d3..467671bc669 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -18,6 +18,8 @@ package preemption import ( "context" + "errors" + "fmt" "sort" "testing" @@ -28,6 +30,8 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/events" "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" @@ -355,3 +359,479 @@ func TestSelectCandidate(t *testing.T) { }) } } + +type fakeCandidate struct { + victims *extenderv1.Victims + name string +} + +// Victims returns s.victims. +func (s *fakeCandidate) Victims() *extenderv1.Victims { + return s.victims +} + +// Name returns s.name. +func (s *fakeCandidate) Name() string { + return s.name +} + +func TestPrepareCandidate(t *testing.T) { + var ( + node1Name = "node1" + defaultSchedulerName = "default-scheduler" + ) + condition := v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: v1.PodReasonPreemptionByScheduler, + Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", defaultSchedulerName), + } + + var ( + victim1 = st.MakePod().Name("victim1").UID("victim1"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + + victim2 = st.MakePod().Name("victim2").UID("victim2"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(50000). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + + victim1WithMatchingCondition = st.MakePod().Name("victim1").UID("victim1"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Conditions([]v1.PodCondition{condition}). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + + preemptor = st.MakePod().Name("preemptor").UID("preemptor"). + SchedulerName(defaultSchedulerName).Priority(highPriority). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + ) + + tests := []struct { + name string + nodeNames []string + candidate *fakeCandidate + preemptor *v1.Pod + testPods []*v1.Pod + expectedStatus *framework.Status + }{ + { + name: "no victims", + candidate: &fakeCandidate{ + victims: &extenderv1.Victims{}, + }, + preemptor: preemptor, + testPods: []*v1.Pod{ + victim1, + }, + nodeNames: []string{node1Name}, + expectedStatus: nil, + }, + { + name: "one victim without condition", + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + victim1, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{ + victim1, + }, + nodeNames: []string{node1Name}, + expectedStatus: nil, + }, + { + name: "one victim with same condition", + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + victim1WithMatchingCondition, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{ + victim1WithMatchingCondition, + }, + nodeNames: []string{node1Name}, + expectedStatus: nil, + }, + { + name: "one victim, but patch pod failed (not found victim1 pod)", + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + victim1WithMatchingCondition, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + }, + { + name: "one victim, but delete pod failed (not found victim1 pod)", + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + victim1, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedStatus: framework.AsStatus(errors.New("delete pod failed")), + }, + { + name: "two victims without condition, one passes successfully and the second fails (not found victim2 pod)", + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + victim1, + victim2, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{ + victim1, + }, + nodeNames: []string{node1Name}, + expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics.Register() + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + nodes := make([]*v1.Node, len(tt.nodeNames)) + for i, nodeName := range tt.nodeNames { + nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() + } + registeredPlugins := append([]tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + var objs []runtime.Object + for _, pod := range tt.testPods { + objs = append(objs, pod) + } + cs := clientsetfake.NewClientset(objs...) + informerFactory := informers.NewSharedInformerFactory(cs, 0) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) + fwk, err := tf.NewFramework( + ctx, + registeredPlugins, "", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithLogger(logger), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), + frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")), + ) + if err != nil { + t.Fatal(err) + } + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} + pe := Evaluator{ + PluginName: "FakePreemptionScorePostFilter", + Handler: fwk, + Interface: fakePreemptionScorePostFilterPlugin, + State: framework.NewCycleState(), + } + + status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin") + if tt.expectedStatus == nil { + if status != nil { + t.Errorf("expect nil status, but got %v", status) + } + } else { + if status == nil { + t.Errorf("expect status %v, but got nil", tt.expectedStatus) + } else if status.Code() != tt.expectedStatus.Code() { + t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code()) + } + } + }) + } +} + +type fakeExtender struct { + ignorable bool + errProcessPreemption bool + supportsPreemption bool + returnsNoVictims bool +} + +func newFakeExtender() *fakeExtender { + return &fakeExtender{} +} + +func (f *fakeExtender) WithIgnorable(ignorable bool) *fakeExtender { + f.ignorable = ignorable + return f +} + +func (f *fakeExtender) WithErrProcessPreemption(errProcessPreemption bool) *fakeExtender { + f.errProcessPreemption = errProcessPreemption + return f +} + +func (f *fakeExtender) WithSupportsPreemption(supportsPreemption bool) *fakeExtender { + f.supportsPreemption = supportsPreemption + return f +} + +func (f *fakeExtender) WithReturnNoVictims(returnsNoVictims bool) *fakeExtender { + f.returnsNoVictims = returnsNoVictims + return f +} + +func (f *fakeExtender) Name() string { + return "fakeExtender" +} + +func (f *fakeExtender) IsIgnorable() bool { + return f.ignorable +} + +func (f *fakeExtender) ProcessPreemption( + _ *v1.Pod, + victims map[string]*extenderv1.Victims, + _ framework.NodeInfoLister, +) (map[string]*extenderv1.Victims, error) { + if f.supportsPreemption { + if f.errProcessPreemption { + return nil, errors.New("extender preempt error") + } + if f.returnsNoVictims { + return map[string]*extenderv1.Victims{"mock": {}}, nil + } + return victims, nil + } + return nil, nil +} + +func (f *fakeExtender) SupportsPreemption() bool { + return f.supportsPreemption +} + +func (f *fakeExtender) IsInterested(pod *v1.Pod) bool { + return pod != nil +} + +func (f *fakeExtender) Filter(_ *v1.Pod, _ []*framework.NodeInfo) ([]*framework.NodeInfo, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) { + return nil, nil, nil, nil +} + +func (f *fakeExtender) Prioritize( + _ *v1.Pod, + _ []*framework.NodeInfo, +) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) { + return nil, 0, nil +} + +func (f *fakeExtender) Bind(_ *v1.Binding) error { + return nil +} + +func (f *fakeExtender) IsBinder() bool { + return true +} + +func (f *fakeExtender) IsPrioritizer() bool { + return true +} + +func (f *fakeExtender) IsFilter() bool { + return true +} + +func TestCallExtenders(t *testing.T) { + var ( + node1Name = "node1" + defaultSchedulerName = "default-scheduler" + preemptor = st.MakePod().Name("preemptor").UID("preemptor"). + SchedulerName(defaultSchedulerName).Priority(highPriority). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + victim = st.MakePod().Name("victim").UID("victim"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + makeCandidates = func(nodeName string, pods ...*v1.Pod) []Candidate { + return []Candidate{ + &fakeCandidate{ + name: nodeName, + victims: &extenderv1.Victims{ + Pods: pods, + }, + }, + } + } + ) + tests := []struct { + name string + extenders []framework.Extender + candidates []Candidate + wantStatus *framework.Status + wantCandidates []Candidate + }{ + { + name: "no extenders", + extenders: []framework.Extender{}, + candidates: makeCandidates(node1Name, victim), + wantStatus: nil, + wantCandidates: makeCandidates(node1Name, victim), + }, + { + name: "one extender supports preemption", + extenders: []framework.Extender{ + newFakeExtender().WithSupportsPreemption(true), + }, + candidates: makeCandidates(node1Name, victim), + wantStatus: nil, + wantCandidates: makeCandidates(node1Name, victim), + }, + { + name: "one extender with return no victims", + extenders: []framework.Extender{ + newFakeExtender().WithSupportsPreemption(true).WithReturnNoVictims(true), + }, + candidates: makeCandidates(node1Name, victim), + wantStatus: framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", node1Name)), + wantCandidates: []Candidate{}, + }, + { + name: "one extender does not support preemption", + extenders: []framework.Extender{ + newFakeExtender().WithSupportsPreemption(false), + }, + candidates: makeCandidates(node1Name, victim), + wantStatus: nil, + wantCandidates: makeCandidates(node1Name, victim), + }, + { + name: "one extender with no return victims and is ignorable", + extenders: []framework.Extender{ + newFakeExtender().WithSupportsPreemption(true). + WithReturnNoVictims(true).WithIgnorable(true), + }, + candidates: makeCandidates(node1Name, victim), + wantStatus: nil, + wantCandidates: []Candidate{}, + }, + { + name: "one extender returns error and is ignorable", + extenders: []framework.Extender{ + newFakeExtender().WithIgnorable(true). + WithSupportsPreemption(true).WithErrProcessPreemption(true), + }, + candidates: makeCandidates(node1Name, victim), + wantStatus: nil, + wantCandidates: makeCandidates(node1Name, victim), + }, + { + name: "one extender returns error and is not ignorable", + extenders: []framework.Extender{ + newFakeExtender().WithErrProcessPreemption(true). + WithSupportsPreemption(true), + }, + candidates: makeCandidates(node1Name, victim), + wantStatus: framework.AsStatus(fmt.Errorf("extender preempt error")), + wantCandidates: nil, + }, + { + name: "one extender with empty victims input", + extenders: []framework.Extender{ + newFakeExtender().WithSupportsPreemption(true), + }, + candidates: []Candidate{}, + wantStatus: nil, + wantCandidates: []Candidate{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics.Register() + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + nodes := make([]*v1.Node, len([]string{node1Name})) + for i, nodeName := range []string{node1Name} { + nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() + } + registeredPlugins := append([]tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + var objs []runtime.Object + objs = append(objs, preemptor) + cs := clientsetfake.NewClientset(objs...) + informerFactory := informers.NewSharedInformerFactory(cs, 0) + fwk, err := tf.NewFramework( + ctx, + registeredPlugins, "", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithLogger(logger), + frameworkruntime.WithExtenders(tt.extenders), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot([]*v1.Pod{preemptor}, nodes)), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), + ) + if err != nil { + t.Fatal(err) + } + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} + pe := Evaluator{ + PluginName: "FakePreemptionScorePostFilter", + Handler: fwk, + Interface: fakePreemptionScorePostFilterPlugin, + State: framework.NewCycleState(), + } + gotCandidates, status := pe.callExtenders(logger, preemptor, tt.candidates) + if (tt.wantStatus == nil) != (status == nil) || status.Code() != tt.wantStatus.Code() { + t.Errorf("callExtenders() status mismatch. got: %v, want: %v", status, tt.wantStatus) + } + + if len(gotCandidates) != len(tt.wantCandidates) { + t.Errorf("callExtenders() returned unexpected number of results. got: %d, want: %d", len(gotCandidates), len(tt.wantCandidates)) + } else { + for i, gotCandidate := range gotCandidates { + wantCandidate := tt.wantCandidates[i] + if gotCandidate.Name() != wantCandidate.Name() { + t.Errorf("callExtenders() node name mismatch. got: %s, want: %s", gotCandidate.Name(), wantCandidate.Name()) + } + if len(gotCandidate.Victims().Pods) != len(wantCandidate.Victims().Pods) { + t.Errorf("callExtenders() number of victim pods mismatch for node %s. got: %d, want: %d", gotCandidate.Name(), len(gotCandidate.Victims().Pods), len(wantCandidate.Victims().Pods)) + } + } + } + }) + } +}