diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 0ae9f65e030..89070f72b4a 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -186,7 +186,7 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy } if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { if !apierrors.IsNotFound(err) { - logger.Error(err, "Preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) + logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) return err } logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) @@ -488,7 +488,7 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName ev.mu.Unlock() logger := klog.FromContext(ctx) - go func() { // TODO: use paralizer + go func() { defer cancel() logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index be71f8b2a11..d0fbe0f9c6f 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" @@ -391,6 +392,11 @@ func TestPrepareCandidate(t *testing.T) { Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). Obj() + failVictim = st.MakePod().Name("fail-victim").UID("victim1"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + victim2 = st.MakePod().Name("victim2").UID("victim2"). Node(node1Name).SchedulerName(defaultSchedulerName).Priority(50000). Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). @@ -402,6 +408,12 @@ func TestPrepareCandidate(t *testing.T) { Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). Obj() + failVictim1WithMatchingCondition = st.MakePod().Name("fail-victim").UID("victim1"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Conditions([]v1.PodCondition{condition}). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() + preemptor = st.MakePod().Name("preemptor").UID("preemptor"). SchedulerName(defaultSchedulerName).Priority(highPriority). Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). @@ -463,7 +475,7 @@ func TestPrepareCandidate(t *testing.T) { expectedStatus: nil, }, { - name: "one victim, but patch pod failed (not found victim1 pod)", + name: "one victim, not-found victim error is ignored when patching", candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -475,10 +487,25 @@ func TestPrepareCandidate(t *testing.T) { preemptor: preemptor, testPods: []*v1.Pod{}, nodeNames: []string{node1Name}, + expectedStatus: nil, + }, + { + name: "one victim, but patch pod failed", + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + failVictim1WithMatchingCondition, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), }, { - name: "one victim, but delete pod failed (not found victim1 pod)", + name: "one victim, not-found victim error is ignored when deleting", candidate: &fakeCandidate{ name: node1Name, victims: &extenderv1.Victims{ @@ -490,9 +517,23 @@ func TestPrepareCandidate(t *testing.T) { preemptor: preemptor, testPods: []*v1.Pod{}, nodeNames: []string{node1Name}, - expectedStatus: framework.AsStatus(errors.New("delete pod failed")), + expectedStatus: nil, }, { + name: "one victim, but delete pod failed", + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + failVictim, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedStatus: framework.AsStatus(errors.New("delete pod failed")), + }, { name: "two victims without condition, one passes successfully and the second fails (not found victim2 pod)", candidate: &fakeCandidate{ name: node1Name, @@ -508,7 +549,7 @@ func TestPrepareCandidate(t *testing.T) { victim1, }, nodeNames: []string{node1Name}, - expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), + expectedStatus: nil, }, } @@ -532,6 +573,20 @@ func TestPrepareCandidate(t *testing.T) { objs = append(objs, pod) } cs := clientsetfake.NewClientset(objs...) + cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.(clienttesting.DeleteAction).GetName() == "fail-victim" { + return true, nil, fmt.Errorf("delete pod failed") + } + return true, nil, nil + }) + + cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { + if action.(clienttesting.PatchAction).GetName() == "fail-victim" { + return true, nil, fmt.Errorf("patch pod status failed") + } + return true, nil, nil + }) + informerFactory := informers.NewSharedInformerFactory(cs, 0) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) fwk, err := tf.NewFramework( @@ -551,11 +606,7 @@ func TestPrepareCandidate(t *testing.T) { informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{} - pe := Evaluator{ - PluginName: "FakePreemptionScorePostFilter", - Handler: fwk, - Interface: fakePreemptionScorePostFilterPlugin, - } + pe := NewEvaluator("FakePreemptionScorePostFilter", fwk, fakePreemptionScorePostFilterPlugin, false) status := pe.prepareCandidate(ctx, tt.candidate, tt.preemptor, "test-plugin") if tt.expectedStatus == nil {