fix scheduler.TestCoSchedulinngWithPermitPlugin and test scheduler.PermitPlugin

After moving Permit() to the scheduling cycle test PermitPlugin should
no longer wait inside Permit() for another pod to enter Permit() and become waiting pod.
In the past this was a way to make test work regardless of order in
which pods enter Permit(), but now only one Permit() can be executed at
any given moment and waiting for another pod to enter Permit() inside
Permit() leads to timeouts.

In this change waitAndRejectPermit and waitAndAllowPermit flags make first
pod to enter Permit() a waiting pod and second pod to enter Permit()
either rejecting or allowing pod.

Mentioned in #88469
This commit is contained in:
Mateusz Litwin 2020-02-25 21:38:22 -08:00
parent 79e1ad2f4b
commit b93e3d18e9

View File

@ -102,8 +102,10 @@ type PermitPlugin struct {
timeoutPermit bool
waitAndRejectPermit bool
waitAndAllowPermit bool
allowPermit bool
cancelled bool
waitingPod string
rejectingPod string
allowingPod string
fh framework.FrameworkHandle
}
@ -406,27 +408,20 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
}()
return framework.NewStatus(framework.Wait, ""), 3 * time.Second
}
if pp.allowPermit && pod.Name != "waiting-pod" {
return nil, 0
}
if pp.waitAndRejectPermit || pp.waitAndAllowPermit {
if pod.Name == "waiting-pod" {
if pp.waitingPod == "" || pp.waitingPod == pod.Name {
pp.waitingPod = pod.Name
return framework.NewStatus(framework.Wait, ""), 30 * time.Second
}
// This is the signalling pod, wait until the waiting-pod is actually waiting and then either reject or allow it.
wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
w := false
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
return w, nil
})
if pp.waitAndRejectPermit {
pp.rejectingPod = pod.Name
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) {
wp.Reject(fmt.Sprintf("reject pod %v", wp.GetPod().Name))
})
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
}
if pp.waitAndAllowPermit {
pp.allowingPod = pod.Name
pp.allowAllPods()
return nil, 0
}
@ -452,8 +447,10 @@ func (pp *PermitPlugin) reset() {
pp.timeoutPermit = false
pp.waitAndRejectPermit = false
pp.waitAndAllowPermit = false
pp.allowPermit = false
pp.cancelled = false
pp.waitingPod = ""
pp.allowingPod = ""
pp.rejectingPod = ""
}
// newPermitPlugin returns a factory for permit plugin with specified PermitPlugin.
@ -1300,6 +1297,7 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
registry, prof := initRegistryAndConfig(permitPlugin)
// Create the master and the scheduler with the test plugin set.
// TODO Make the subtests not share scheduler instances.
testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "permit-plugin", nil), 2,
scheduler.WithProfiles(prof),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
@ -1326,31 +1324,42 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
permitPlugin.waitAndRejectPermit = test.waitReject
permitPlugin.waitAndAllowPermit = test.waitAllow
// Create two pods.
waitingPod, err := createPausePod(testCtx.ClientSet,
initPausePod(testCtx.ClientSet, &pausePodConfig{Name: "waiting-pod", Namespace: testCtx.NS.Name}))
// Create two pods. First pod to enter Permit() will wait and a second one will either
// reject or allow first one.
podA, err := createPausePod(testCtx.ClientSet,
initPausePod(testCtx.ClientSet, &pausePodConfig{Name: "pod-a", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating the waiting pod: %v", err)
t.Errorf("Error while creating the first pod: %v", err)
}
signallingPod, err := createPausePod(testCtx.ClientSet,
initPausePod(testCtx.ClientSet, &pausePodConfig{Name: "signalling-pod", Namespace: testCtx.NS.Name}))
podB, err := createPausePod(testCtx.ClientSet,
initPausePod(testCtx.ClientSet, &pausePodConfig{Name: "pod-b", Namespace: testCtx.NS.Name}))
if err != nil {
t.Errorf("Error while creating the signalling pod: %v", err)
t.Errorf("Error while creating the second pod: %v", err)
}
if test.waitReject {
if err = waitForPodUnschedulable(testCtx.ClientSet, waitingPod); err != nil {
t.Errorf("test #%v: Didn't expect the waiting pod to be scheduled. error: %v", i, err)
if err = waitForPodUnschedulable(testCtx.ClientSet, podA); err != nil {
t.Errorf("test #%v: Didn't expect the first pod to be scheduled. error: %v", i, err)
}
if err = waitForPodUnschedulable(testCtx.ClientSet, signallingPod); err != nil {
t.Errorf("test #%v: Didn't expect the signalling pod to be scheduled. error: %v", i, err)
if err = waitForPodUnschedulable(testCtx.ClientSet, podB); err != nil {
t.Errorf("test #%v: Didn't expect the second pod to be scheduled. error: %v", i, err)
}
if !((permitPlugin.waitingPod == podA.Name && permitPlugin.rejectingPod == podB.Name) ||
(permitPlugin.waitingPod == podB.Name && permitPlugin.rejectingPod == podA.Name)) {
t.Errorf("test #%v: Expect one pod to wait and another pod to reject instead %s waited and %s rejected.",
i, permitPlugin.waitingPod, permitPlugin.rejectingPod)
}
} else {
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, waitingPod); err != nil {
t.Errorf("test #%v: Expected the waiting pod to be scheduled. error: %v", i, err)
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, podA); err != nil {
t.Errorf("test #%v: Expected the first pod to be scheduled. error: %v", i, err)
}
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, signallingPod); err != nil {
t.Errorf("test #%v: Expected the signalling pod to be scheduled. error: %v", i, err)
if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, podB); err != nil {
t.Errorf("test #%v: Expected the second pod to be scheduled. error: %v", i, err)
}
if !((permitPlugin.waitingPod == podA.Name && permitPlugin.allowingPod == podB.Name) ||
(permitPlugin.waitingPod == podB.Name && permitPlugin.allowingPod == podA.Name)) {
t.Errorf("test #%v: Expect one pod to wait and another pod to allow instead %s waited and %s allowed.",
i, permitPlugin.waitingPod, permitPlugin.allowingPod)
}
}
@ -1359,7 +1368,7 @@ func TestCoSchedulingWithPermitPlugin(t *testing.T) {
}
permitPlugin.reset()
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, signallingPod})
testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{podA, podB})
}
}
@ -1499,7 +1508,6 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
permitPlugin.timeoutPermit = false
permitPlugin.waitAndRejectPermit = false
permitPlugin.waitAndAllowPermit = true
permitPlugin.allowPermit = true
lowPriority, highPriority := int32(100), int32(300)
resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{
@ -1507,7 +1515,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)},
}
// Create two pods.
// First pod will go waiting.
waitingPod := initPausePod(testCtx.ClientSet, &pausePodConfig{Name: "waiting-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest})
waitingPod.Spec.TerminationGracePeriodSeconds = new(int64)
waitingPod, err = createPausePod(testCtx.ClientSet, waitingPod)
@ -1521,6 +1529,7 @@ func TestPreemptWithPermitPlugin(t *testing.T) {
return w, nil
})
// Create second pod which should preempt first pod.
preemptorPod, err := createPausePod(testCtx.ClientSet,
initPausePod(testCtx.ClientSet, &pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &resourceRequest}))
if err != nil {