fix: refactor TestPrepareCandidate to test async feature too

This commit is contained in:
Kensei Nakada 2024-10-30 22:41:54 +09:00
parent 02eabc7d8e
commit 98bdbdcf18
2 changed files with 199 additions and 105 deletions

View File

@ -290,9 +290,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, state *framework.CycleState, p
// 5) Perform preparation work before nominating the selected candidate.
if ev.enableAsyncPreemption {
if status := ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
}
ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName)
} else {
if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() {
return nil, status
@ -469,7 +467,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
// The Pod won't be retried until the goroutine triggered here completes.
//
// See http://kep.k8s.io/4832 for how the async preemption works.
func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName string) *framework.Status {
func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName string) {
metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))
// intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx,
@ -509,6 +507,14 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
if len(c.Victims().Pods) == 0 {
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
return
}
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
logger.Error(err, "Error occurred during preemption")
@ -524,8 +530,6 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name())
}()
return nil
}
func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) {

View File

@ -22,12 +22,16 @@ import (
"fmt"
"sort"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
@ -421,15 +425,22 @@ func TestPrepareCandidate(t *testing.T) {
)
tests := []struct {
name string
nodeNames []string
candidate *fakeCandidate
preemptor *v1.Pod
testPods []*v1.Pod
name string
nodeNames []string
candidate *fakeCandidate
preemptor *v1.Pod
testPods []*v1.Pod
expectedDeletedPods []string
expectedDeletionError bool
expectedPatchError bool
// Only compared when async preemption is disabled.
expectedStatus *framework.Status
// Only compared when async preemption is enabled.
expectedPreemptingMap sets.Set[types.UID]
}{
{
name: "no victims",
candidate: &fakeCandidate{
victims: &extenderv1.Victims{},
},
@ -437,11 +448,13 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim without condition",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -454,11 +467,14 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim with same condition",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -471,11 +487,14 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1WithMatchingCondition,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, not-found victim error is ignored when patching",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -484,13 +503,16 @@ func TestPrepareCandidate(t *testing.T) {
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: nil,
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, but patch pod failed",
name: "one victim, but pod deletion failed",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -499,13 +521,16 @@ func TestPrepareCandidate(t *testing.T) {
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
preemptor: preemptor,
testPods: []*v1.Pod{},
expectedDeletionError: true,
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, not-found victim error is ignored when deleting",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -514,13 +539,16 @@ func TestPrepareCandidate(t *testing.T) {
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: nil,
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, but delete pod failed",
name: "one victim, but patch pod failed",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
@ -529,17 +557,21 @@ func TestPrepareCandidate(t *testing.T) {
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("delete pod failed")),
}, {
name: "two victims without condition, one passes successfully and the second fails (not found victim2 pod)",
preemptor: preemptor,
testPods: []*v1.Pod{},
expectedPatchError: true,
nodeNames: []string{node1Name},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "two victims without condition, one passes successfully and the second fails",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victim1,
failVictim,
victim2,
},
},
@ -548,79 +580,137 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedPatchError: true,
expectedDeletedPods: []string{"victim2"},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metrics.Register()
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for _, asyncPreemptionEnabled := range []bool{true, false} {
for _, tt := range tests {
t.Run(fmt.Sprintf("%v (Async preemption enabled: %v)", tt.name, asyncPreemptionEnabled), func(t *testing.T) {
metrics.Register()
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodes := make([]*v1.Node, len(tt.nodeNames))
for i, nodeName := range tt.nodeNames {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
}
registeredPlugins := append([]tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
var objs []runtime.Object
for _, pod := range tt.testPods {
objs = append(objs, pod)
}
cs := clientsetfake.NewClientset(objs...)
cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.(clienttesting.DeleteAction).GetName() == "fail-victim" {
return true, nil, fmt.Errorf("delete pod failed")
nodes := make([]*v1.Node, len(tt.nodeNames))
for i, nodeName := range tt.nodeNames {
nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
}
registeredPlugins := append([]tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
)
var objs []runtime.Object
for _, pod := range tt.testPods {
objs = append(objs, pod)
}
requestStopper := make(chan struct{})
deletedPods := sets.New[string]()
deletionFailure := false // whether any request to delete pod failed
patchFailure := false // whether any request to patch pod status failed
cs := clientsetfake.NewClientset(objs...)
cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
<-requestStopper
name := action.(clienttesting.DeleteAction).GetName()
if name == "fail-victim" {
deletionFailure = true
return true, nil, fmt.Errorf("delete pod failed")
}
deletedPods.Insert(name)
return true, nil, nil
})
cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
<-requestStopper
if action.(clienttesting.PatchAction).GetName() == "fail-victim" {
patchFailure = true
return true, nil, fmt.Errorf("patch pod status failed")
}
return true, nil, nil
})
informerFactory := informers.NewSharedInformerFactory(cs, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()})
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")),
)
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
pe := NewEvaluator("FakePreemptionScorePostFilter", fwk, fakePreemptionScorePostFilterPlugin, asyncPreemptionEnabled)
if asyncPreemptionEnabled {
pe.prepareCandidateAsync(tt.candidate, tt.preemptor, "test-plugin")
pe.mu.Lock()
// The preempting map should be registered synchronously
// so we don't need wait.Poll.
if !tt.expectedPreemptingMap.Equal(pe.preempting) {
t.Errorf("expected preempting map %v, got %v", tt.expectedPreemptingMap, pe.preempting)
close(requestStopper)
pe.mu.Unlock()
return
}
pe.mu.Unlock()
// make the requests complete
close(requestStopper)
return
} else {
close(requestStopper) // no need to stop requests
status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin")
if tt.expectedStatus == nil {
if status != nil {
t.Errorf("expect nil status, but got %v", status)
}
} else {
if status == nil {
t.Errorf("expect status %v, but got nil", tt.expectedStatus)
} else if status.Code() != tt.expectedStatus.Code() {
t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code())
} else if status.Message() != tt.expectedStatus.Message() {
t.Errorf("expect status message %v, but got %v", tt.expectedStatus.Message(), status.Message())
}
}
}
var lastErrMsg string
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
if !deletedPods.Equal(sets.New(tt.expectedDeletedPods...)) {
lastErrMsg = fmt.Sprintf("expected deleted pods %v, got %v", tt.expectedDeletedPods, deletedPods.UnsortedList())
return false, nil
}
if tt.expectedDeletionError != deletionFailure {
lastErrMsg = fmt.Sprintf("expected deletion error %v, got %v", tt.expectedDeletionError, deletionFailure)
return false, nil
}
if tt.expectedPatchError != patchFailure {
lastErrMsg = fmt.Sprintf("expected patch error %v, got %v", tt.expectedPatchError, patchFailure)
return false, nil
}
return true, nil
}); err != nil {
t.Fatal(lastErrMsg)
}
return true, nil, nil
})
cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
if action.(clienttesting.PatchAction).GetName() == "fail-victim" {
return true, nil, fmt.Errorf("patch pod status failed")
}
return true, nil, nil
})
informerFactory := informers.NewSharedInformerFactory(cs, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()})
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithLogger(logger),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")),
)
if err != nil {
t.Fatal(err)
}
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
pe := NewEvaluator("FakePreemptionScorePostFilter", fwk, fakePreemptionScorePostFilterPlugin, false)
status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin")
if tt.expectedStatus == nil {
if status != nil {
t.Errorf("expect nil status, but got %v", status)
}
} else {
if status == nil {
t.Errorf("expect status %v, but got nil", tt.expectedStatus)
} else if status.Code() != tt.expectedStatus.Code() {
t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code())
}
}
})
}
}
}