From 98bdbdcf1840a6b7a019b606caa6b21c3cbd278a Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Wed, 30 Oct 2024 22:41:54 +0900 Subject: [PATCH] fix: refactor TestPrepareCandidate to test async feature too --- .../framework/preemption/preemption.go | 16 +- .../framework/preemption/preemption_test.go | 288 ++++++++++++------ 2 files changed, 199 insertions(+), 105 deletions(-) diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 89070f72b4a..91a0d629002 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -290,9 +290,7 @@ func (ev *Evaluator) Preempt(ctx context.Context, state *framework.CycleState, p // 5) Perform preparation work before nominating the selected candidate. if ev.enableAsyncPreemption { - if status := ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName); !status.IsSuccess() { - return nil, status - } + ev.prepareCandidateAsync(bestCandidate, pod, ev.PluginName) } else { if status := ev.prepareCandidate(ctx, bestCandidate, pod, ev.PluginName); !status.IsSuccess() { return nil, status @@ -469,7 +467,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. // The Pod won't be retried until the goroutine triggered here completes. // // See http://kep.k8s.io/4832 for how the async preemption works. -func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName string) *framework.Status { +func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName string) { metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods))) // intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx, @@ -509,6 +507,14 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // and the pod could end up stucking at the unschedulable pod pool // by all the pod removal events being ignored. + if len(c.Victims().Pods) == 0 { + ev.mu.Lock() + delete(ev.preempting, pod.UID) + ev.mu.Unlock() + + return + } + ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) if err := errCh.ReceiveError(); err != nil { logger.Error(err, "Error occurred during preemption") @@ -524,8 +530,6 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name()) }() - - return nil } func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister) ([]*policy.PodDisruptionBudget, error) { diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index d0fbe0f9c6f..e00644b3390 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -22,12 +22,16 @@ import ( "fmt" "sort" "testing" + "time" "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" @@ -421,15 +425,22 @@ func TestPrepareCandidate(t *testing.T) { ) tests := []struct { - name string - nodeNames []string - candidate *fakeCandidate - preemptor *v1.Pod - testPods []*v1.Pod + name string + nodeNames []string + candidate *fakeCandidate + preemptor *v1.Pod + testPods []*v1.Pod + expectedDeletedPods []string + expectedDeletionError bool + expectedPatchError bool + // Only compared when async preemption is disabled. expectedStatus *framework.Status + // Only compared when async preemption is enabled. + expectedPreemptingMap sets.Set[types.UID] }{ { name: "no victims", + candidate: &fakeCandidate{ victims: &extenderv1.Victims{}, }, @@ -437,11 +448,13 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { name: "one victim without condition", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -454,11 +467,14 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedDeletedPods: []string{"victim1"}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { name: "one victim with same condition", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -471,11 +487,14 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1WithMatchingCondition, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedDeletedPods: []string{"victim1"}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { name: "one victim, not-found victim error is ignored when patching", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -484,13 +503,16 @@ func TestPrepareCandidate(t *testing.T) { }, }, }, - preemptor: preemptor, - testPods: []*v1.Pod{}, - nodeNames: []string{node1Name}, - expectedStatus: nil, + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedDeletedPods: []string{"victim1"}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { - name: "one victim, but patch pod failed", + name: "one victim, but pod deletion failed", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -499,13 +521,16 @@ func TestPrepareCandidate(t *testing.T) { }, }, }, - preemptor: preemptor, - testPods: []*v1.Pod{}, - nodeNames: []string{node1Name}, - expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + preemptor: preemptor, + testPods: []*v1.Pod{}, + expectedDeletionError: true, + nodeNames: []string{node1Name}, + expectedStatus: framework.AsStatus(errors.New("delete pod failed")), + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { name: "one victim, not-found victim error is ignored when deleting", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -514,13 +539,16 @@ func TestPrepareCandidate(t *testing.T) { }, }, }, - preemptor: preemptor, - testPods: []*v1.Pod{}, - nodeNames: []string{node1Name}, - expectedStatus: nil, + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedDeletedPods: []string{"victim1"}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { - name: "one victim, but delete pod failed", + name: "one victim, but patch pod failed", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -529,17 +557,21 @@ func TestPrepareCandidate(t *testing.T) { }, }, }, - preemptor: preemptor, - testPods: []*v1.Pod{}, - nodeNames: []string{node1Name}, - expectedStatus: framework.AsStatus(errors.New("delete pod failed")), - }, { - name: "two victims without condition, one passes successfully and the second fails (not found victim2 pod)", + preemptor: preemptor, + testPods: []*v1.Pod{}, + expectedPatchError: true, + nodeNames: []string{node1Name}, + expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + expectedPreemptingMap: sets.New(types.UID("preemptor")), + }, + { + name: "two victims without condition, one passes successfully and the second fails", + candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ Pods: []*v1.Pod{ - victim1, + failVictim, victim2, }, }, @@ -548,79 +580,137 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedPatchError: true, + expectedDeletedPods: []string{"victim2"}, + expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - metrics.Register() - logger, ctx := ktesting.NewTestContext(t) - ctx, cancel := context.WithCancel(ctx) - defer cancel() + for _, asyncPreemptionEnabled := range []bool{true, false} { + for _, tt := range tests { + t.Run(fmt.Sprintf("%v (Async preemption enabled: %v)", tt.name, asyncPreemptionEnabled), func(t *testing.T) { + metrics.Register() + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() - nodes := make([]*v1.Node, len(tt.nodeNames)) - for i, nodeName := range tt.nodeNames { - nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() - } - registeredPlugins := append([]tf.RegisterPluginFunc{ - tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, - tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), - ) - var objs []runtime.Object - for _, pod := range tt.testPods { - objs = append(objs, pod) - } - cs := clientsetfake.NewClientset(objs...) - cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - if action.(clienttesting.DeleteAction).GetName() == "fail-victim" { - return true, nil, fmt.Errorf("delete pod failed") + nodes := make([]*v1.Node, len(tt.nodeNames)) + for i, nodeName := range tt.nodeNames { + nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj() + } + registeredPlugins := append([]tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)}, + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + ) + var objs []runtime.Object + for _, pod := range tt.testPods { + objs = append(objs, pod) + } + + requestStopper := make(chan struct{}) + deletedPods := sets.New[string]() + deletionFailure := false // whether any request to delete pod failed + patchFailure := false // whether any request to patch pod status failed + + cs := clientsetfake.NewClientset(objs...) + cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + <-requestStopper + name := action.(clienttesting.DeleteAction).GetName() + if name == "fail-victim" { + deletionFailure = true + return true, nil, fmt.Errorf("delete pod failed") + } + + deletedPods.Insert(name) + return true, nil, nil + }) + + cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + <-requestStopper + if action.(clienttesting.PatchAction).GetName() == "fail-victim" { + patchFailure = true + return true, nil, fmt.Errorf("patch pod status failed") + } + return true, nil, nil + }) + + informerFactory := informers.NewSharedInformerFactory(cs, 0) + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) + fwk, err := tf.NewFramework( + ctx, + registeredPlugins, "", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithLogger(logger), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), + frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)), + frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), + frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")), + ) + if err != nil { + t.Fatal(err) + } + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} + pe := NewEvaluator("FakePreemptionScorePostFilter", fwk, fakePreemptionScorePostFilterPlugin, asyncPreemptionEnabled) + + if asyncPreemptionEnabled { + pe.prepareCandidateAsync(tt.candidate, tt.preemptor, "test-plugin") + pe.mu.Lock() + // The preempting map should be registered synchronously + // so we don't need wait.Poll. + if !tt.expectedPreemptingMap.Equal(pe.preempting) { + t.Errorf("expected preempting map %v, got %v", tt.expectedPreemptingMap, pe.preempting) + close(requestStopper) + pe.mu.Unlock() + return + } + pe.mu.Unlock() + // make the requests complete + close(requestStopper) + + return + } else { + close(requestStopper) // no need to stop requests + status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin") + if tt.expectedStatus == nil { + if status != nil { + t.Errorf("expect nil status, but got %v", status) + } + } else { + if status == nil { + t.Errorf("expect status %v, but got nil", tt.expectedStatus) + } else if status.Code() != tt.expectedStatus.Code() { + t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code()) + } else if status.Message() != tt.expectedStatus.Message() { + t.Errorf("expect status message %v, but got %v", tt.expectedStatus.Message(), status.Message()) + } + } + } + + var lastErrMsg string + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if !deletedPods.Equal(sets.New(tt.expectedDeletedPods...)) { + lastErrMsg = fmt.Sprintf("expected deleted pods %v, got %v", tt.expectedDeletedPods, deletedPods.UnsortedList()) + return false, nil + } + if tt.expectedDeletionError != deletionFailure { + lastErrMsg = fmt.Sprintf("expected deletion error %v, got %v", tt.expectedDeletionError, deletionFailure) + return false, nil + } + if tt.expectedPatchError != patchFailure { + lastErrMsg = fmt.Sprintf("expected patch error %v, got %v", tt.expectedPatchError, patchFailure) + return false, nil + } + return true, nil + }); err != nil { + t.Fatal(lastErrMsg) } - return true, nil, nil }) - - cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - if action.(clienttesting.PatchAction).GetName() == "fail-victim" { - return true, nil, fmt.Errorf("patch pod status failed") - } - return true, nil, nil - }) - - informerFactory := informers.NewSharedInformerFactory(cs, 0) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) - fwk, err := tf.NewFramework( - ctx, - registeredPlugins, "", - frameworkruntime.WithClientSet(cs), - frameworkruntime.WithLogger(logger), - frameworkruntime.WithInformerFactory(informerFactory), - frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), - frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)), - frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), - frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")), - ) - if err != nil { - t.Fatal(err) - } - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) - fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} - pe := NewEvaluator("FakePreemptionScorePostFilter", fwk, fakePreemptionScorePostFilterPlugin, false) - - status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin") - if tt.expectedStatus == nil { - if status != nil { - t.Errorf("expect nil status, but got %v", status) - } - } else { - if status == nil { - t.Errorf("expect status %v, but got nil", tt.expectedStatus) - } else if status.Code() != tt.expectedStatus.Code() { - t.Errorf("expect status code %v, but got %v", tt.expectedStatus.Code(), status.Code()) - } - } - }) + } } }