Merge pull request #128022 from googs1025/cleanup/ut/preemption

chore(scheduler): add unit test for framework preemption part
This commit is contained in:
Kubernetes Prow Robot 2024-10-29 13:16:55 +00:00 committed by GitHub
commit d09d98e07c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -18,6 +18,8 @@ package preemption
import (
"context"
"errors"
"fmt"
"sort"
"testing"
@ -28,6 +30,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
@ -355,3 +359,479 @@ func TestSelectCandidate(t *testing.T) {
})
}
}
type fakeCandidate struct {
victims *extenderv1.Victims
name string
}
// Victims returns s.victims.
func (s *fakeCandidate) Victims() *extenderv1.Victims {
return s.victims
}
// Name returns s.name.
func (s *fakeCandidate) Name() string {
return s.name
}
func TestPrepareCandidate(t *testing.T) {
var (
node1Name = "node1"
defaultSchedulerName = "default-scheduler"
)
condition := v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonPreemptionByScheduler,
Message: fmt.Sprintf("%s: preempting to accommodate a higher priority pod", defaultSchedulerName),
}
var (
victim1 = st.MakePod().Name("victim1").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
victim2 = st.MakePod().Name("victim2").UID("victim2").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(50000).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
victim1WithMatchingCondition = st.MakePod().Name("victim1").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Conditions([]v1.PodCondition{condition}).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
preemptor = st.MakePod().Name("preemptor").UID("preemptor").
SchedulerName(defaultSchedulerName).Priority(highPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
)
tests := []struct {
name string
nodeNames []string
candidate *fakeCandidate
preemptor *v1.Pod
testPods []*v1.Pod
expectedStatus *framework.Status
}{
{
name: "no victims",
candidate: &fakeCandidate{
victims: &extenderv1.Victims{},
},
preemptor: preemptor,
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
},
{
name: "one victim without condition",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victim1,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
},
{
name: "one victim with same condition",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victim1WithMatchingCondition,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{
victim1WithMatchingCondition,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
},
{
name: "one victim, but patch pod failed (not found victim1 pod)",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victim1WithMatchingCondition,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
},
{
name: "one victim, but delete pod failed (not found victim1 pod)",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victim1,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
},
{
name: "two victims without condition, one passes successfully and the second fails (not found victim2 pod)",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victim1,
victim2,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics.Register()
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodes := make([]*v1.Node, len(tt.nodeNames))
for i, nodeName := range tt.nodeNames {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
}
registeredPlugins := append([]tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
var objs []runtime.Object
for _, pod := range tt.testPods {
objs = append(objs, pod)
}
cs := clientsetfake.NewClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()})
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")),
)
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
pe := Evaluator{
PluginName: "FakePreemptionScorePostFilter",
Handler: fwk,
Interface: fakePreemptionScorePostFilterPlugin,
State: framework.NewCycleState(),
}
status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin")
if tt.expectedStatus == nil {
if status != nil {
t.Errorf("expect nil status, but got %v", status)
}
} else {
if status == nil {
t.Errorf("expect status %v, but got nil", tt.expectedStatus)
} else if status.Code() != tt.expectedStatus.Code() {
t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code())
}
}
})
}
}
type fakeExtender struct {
ignorable bool
errProcessPreemption bool
supportsPreemption bool
returnsNoVictims bool
}
func newFakeExtender() *fakeExtender {
return &fakeExtender{}
}
func (f *fakeExtender) WithIgnorable(ignorable bool) *fakeExtender {
f.ignorable = ignorable
return f
}
func (f *fakeExtender) WithErrProcessPreemption(errProcessPreemption bool) *fakeExtender {
f.errProcessPreemption = errProcessPreemption
return f
}
func (f *fakeExtender) WithSupportsPreemption(supportsPreemption bool) *fakeExtender {
f.supportsPreemption = supportsPreemption
return f
}
func (f *fakeExtender) WithReturnNoVictims(returnsNoVictims bool) *fakeExtender {
f.returnsNoVictims = returnsNoVictims
return f
}
func (f *fakeExtender) Name() string {
return "fakeExtender"
}
func (f *fakeExtender) IsIgnorable() bool {
return f.ignorable
}
func (f *fakeExtender) ProcessPreemption(
_ *v1.Pod,
victims map[string]*extenderv1.Victims,
_ framework.NodeInfoLister,
) (map[string]*extenderv1.Victims, error) {
if f.supportsPreemption {
if f.errProcessPreemption {
return nil, errors.New("extender preempt error")
}
if f.returnsNoVictims {
return map[string]*extenderv1.Victims{"mock": {}}, nil
}
return victims, nil
}
return nil, nil
}
func (f *fakeExtender) SupportsPreemption() bool {
return f.supportsPreemption
}
func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
return pod != nil
}
func (f *fakeExtender) Filter(_ *v1.Pod, _ []*framework.NodeInfo) ([]*framework.NodeInfo, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) {
return nil, nil, nil, nil
}
func (f *fakeExtender) Prioritize(
_ *v1.Pod,
_ []*framework.NodeInfo,
) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) {
return nil, 0, nil
}
func (f *fakeExtender) Bind(_ *v1.Binding) error {
return nil
}
func (f *fakeExtender) IsBinder() bool {
return true
}
func (f *fakeExtender) IsPrioritizer() bool {
return true
}
func (f *fakeExtender) IsFilter() bool {
return true
}
func TestCallExtenders(t *testing.T) {
var (
node1Name = "node1"
defaultSchedulerName = "default-scheduler"
preemptor = st.MakePod().Name("preemptor").UID("preemptor").
SchedulerName(defaultSchedulerName).Priority(highPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
victim = st.MakePod().Name("victim").UID("victim").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
makeCandidates = func(nodeName string, pods ...*v1.Pod) []Candidate {
return []Candidate{
&fakeCandidate{
name: nodeName,
victims: &extenderv1.Victims{
Pods: pods,
},
},
}
}
)
tests := []struct {
name string
extenders []framework.Extender
candidates []Candidate
wantStatus *framework.Status
wantCandidates []Candidate
}{
{
name: "no extenders",
extenders: []framework.Extender{},
candidates: makeCandidates(node1Name, victim),
wantStatus: nil,
wantCandidates: makeCandidates(node1Name, victim),
},
{
name: "one extender supports preemption",
extenders: []framework.Extender{
newFakeExtender().WithSupportsPreemption(true),
},
candidates: makeCandidates(node1Name, victim),
wantStatus: nil,
wantCandidates: makeCandidates(node1Name, victim),
},
{
name: "one extender with return no victims",
extenders: []framework.Extender{
newFakeExtender().WithSupportsPreemption(true).WithReturnNoVictims(true),
},
candidates: makeCandidates(node1Name, victim),
wantStatus: framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", node1Name)),
wantCandidates: []Candidate{},
},
{
name: "one extender does not support preemption",
extenders: []framework.Extender{
newFakeExtender().WithSupportsPreemption(false),
},
candidates: makeCandidates(node1Name, victim),
wantStatus: nil,
wantCandidates: makeCandidates(node1Name, victim),
},
{
name: "one extender with no return victims and is ignorable",
extenders: []framework.Extender{
newFakeExtender().WithSupportsPreemption(true).
WithReturnNoVictims(true).WithIgnorable(true),
},
candidates: makeCandidates(node1Name, victim),
wantStatus: nil,
wantCandidates: []Candidate{},
},
{
name: "one extender returns error and is ignorable",
extenders: []framework.Extender{
newFakeExtender().WithIgnorable(true).
WithSupportsPreemption(true).WithErrProcessPreemption(true),
},
candidates: makeCandidates(node1Name, victim),
wantStatus: nil,
wantCandidates: makeCandidates(node1Name, victim),
},
{
name: "one extender returns error and is not ignorable",
extenders: []framework.Extender{
newFakeExtender().WithErrProcessPreemption(true).
WithSupportsPreemption(true),
},
candidates: makeCandidates(node1Name, victim),
wantStatus: framework.AsStatus(fmt.Errorf("extender preempt error")),
wantCandidates: nil,
},
{
name: "one extender with empty victims input",
extenders: []framework.Extender{
newFakeExtender().WithSupportsPreemption(true),
},
candidates: []Candidate{},
wantStatus: nil,
wantCandidates: []Candidate{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics.Register()
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodes := make([]*v1.Node, len([]string{node1Name}))
for i, nodeName := range []string{node1Name} {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
}
registeredPlugins := append([]tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
var objs []runtime.Object
objs = append(objs, preemptor)
cs := clientsetfake.NewClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0)
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithExtenders(tt.extenders),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot([]*v1.Pod{preemptor}, nodes)),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
)
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
pe := Evaluator{
PluginName: "FakePreemptionScorePostFilter",
Handler: fwk,
Interface: fakePreemptionScorePostFilterPlugin,
State: framework.NewCycleState(),
}
gotCandidates, status := pe.callExtenders(logger, preemptor, tt.candidates)
if (tt.wantStatus == nil) != (status == nil) || status.Code() != tt.wantStatus.Code() {
t.Errorf("callExtenders() status mismatch. got: %v, want: %v", status, tt.wantStatus)
}
if len(gotCandidates) != len(tt.wantCandidates) {
t.Errorf("callExtenders() returned unexpected number of results. got: %d, want: %d", len(gotCandidates), len(tt.wantCandidates))
} else {
for i, gotCandidate := range gotCandidates {
wantCandidate := tt.wantCandidates[i]
if gotCandidate.Name() != wantCandidate.Name() {
t.Errorf("callExtenders() node name mismatch. got: %s, want: %s", gotCandidate.Name(), wantCandidate.Name())
}
if len(gotCandidate.Victims().Pods) != len(wantCandidate.Victims().Pods) {
t.Errorf("callExtenders() number of victim pods mismatch for node %s. got: %d, want: %d", gotCandidate.Name(), len(gotCandidate.Victims().Pods), len(wantCandidate.Victims().Pods))
}
}
}
})
}
}