Merge pull request #100325 from cwdsuzhou/donot_delete_waitingpod

Preempting: do not delete the victim if it just exits in WaitingPods
This commit is contained in:
Kubernetes Prow Robot 2021-04-08 20:29:46 -07:00 committed by GitHub
commit 4b9421674f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 14 deletions

View File

@ -694,13 +694,13 @@ func selectVictimsOnNode(
// - Clear the low-priority pods' nominatedNodeName status if needed
func PrepareCandidate(c Candidate, fh framework.Handle, cs kubernetes.Interface, pod *v1.Pod, pluginName string) *framework.Status {
for _, victim := range c.Victims().Pods {
if err := util.DeletePod(cs, victim); err != nil {
klog.ErrorS(err, "preempting pod", "pod", klog.KObj(victim))
return framework.AsStatus(err)
}
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
// Otherwise we should delete the victim.
if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
} else if err := util.DeletePod(cs, victim); err != nil {
klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))
return framework.AsStatus(err)
}
fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",
pod.Namespace, pod.Name, c.Name())

View File

@ -24,7 +24,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -1850,14 +1850,29 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
permitPlugin.timeoutPermit = false
permitPlugin.waitAndRejectPermit = false
permitPlugin.waitAndAllowPermit = true
permitPlugin.waitingPod = "waiting-pod"
lowPriority, highPriority := int32(100), int32(300)
resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
}
preemptorResourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)},
}
// First pod will go waiting.
// First pod will go running.
runningPod := initPausePod(&pausePodConfig{Name: "running-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest})
runningPod.Spec.TerminationGracePeriodSeconds = new(int64)
runningPod, err = createPausePod(testCtx.ClientSet, runningPod)
if err != nil {
t.Errorf("Error while creating the waiting pod: %v", err)
}
// Wait until the pod scheduled, then create a preemptor pod to preempt it.
wait.Poll(100*time.Millisecond, 30*time.Second, podScheduled(testCtx.ClientSet, runningPod.Name, runningPod.Namespace))
// Second pod will go waiting.
waitingPod := initPausePod(&pausePodConfig{Name: "waiting-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest})
waitingPod.Spec.TerminationGracePeriodSeconds = new(int64)
waitingPod, err = createPausePod(testCtx.ClientSet, waitingPod)
@ -1871,9 +1886,9 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
return w, nil
})
// Create second pod which should preempt first pod.
// Create third pod which should preempt other pods.
preemptorPod, err := createPausePod(testCtx.ClientSet,
initPausePod(&pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &resourceRequest}))
initPausePod(&pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &preemptorResourceRequest}))
if err != nil {
t.Errorf("Error while creating the preemptor pod: %v", err)
}
@ -1885,18 +1900,30 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
// }
if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
_, err := getPod(testCtx.ClientSet, waitingPod.Name, waitingPod.Namespace)
return apierrors.IsNotFound(err), nil
w := false
permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
return !w, nil
}); err != nil {
t.Error("Expected the waiting pod to get preempted and deleted")
t.Error("Expected the waiting pod to get preempted")
}
// Expect the waitingPod to be still present.
if _, err := getPod(testCtx.ClientSet, waitingPod.Name, waitingPod.Namespace); err != nil {
t.Error("Get waiting pod in waiting pod failed.")
}
// Expect the runningPod to be deleted physically.
_, err = getPod(testCtx.ClientSet, runningPod.Name, runningPod.Namespace)
if err != nil && !errors.IsNotFound(err) {
t.Error("Get running pod failed.")
}
if err == nil {
t.Error("Running pod still exist.")
}
if permitPlugin.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
permitPlugin.reset()
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, preemptorPod})
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod})
}
func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext {