Send a reject message to permit plugin when preempting a WaitingPod

This commit is contained in:
Jun Gong 2019-07-17 21:37:22 +08:00
parent f101466d2e
commit df14adf474
2 changed files with 101 additions and 2 deletions

View File

@ -295,7 +295,7 @@ func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginCon
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
// If it succeeds, it adds the name of the node where preemption has happened to the pod spec.
// It returns the node name and an error if any.
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
func (sched *Scheduler) preempt(fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
@ -328,6 +328,10 @@ func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, e
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted")
}
sched.config.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
@ -486,7 +490,7 @@ func (sched *Scheduler) scheduleOne() {
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
sched.preempt(fwk, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))

View File

@ -22,6 +22,7 @@ import (
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@ -85,6 +86,7 @@ type PermitPlugin struct {
timeoutPermit bool
waitAndRejectPermit bool
waitAndAllowPermit bool
allowPermit bool
fh framework.FrameworkHandle
}
@ -363,6 +365,9 @@ func (pp *PermitPlugin) Permit(pc *framework.PluginContext, pod *v1.Pod, nodeNam
if pp.timeoutPermit {
return framework.NewStatus(framework.Wait, ""), 3 * time.Second
}
if pp.allowPermit && pod.Name != "waiting-pod" {
return nil, 0
}
if pp.waitAndRejectPermit || pp.waitAndAllowPermit {
if pod.Name == "waiting-pod" {
return framework.NewStatus(framework.Wait, ""), 30 * time.Second
@ -395,6 +400,7 @@ func (pp *PermitPlugin) reset() {
pp.timeoutPermit = false
pp.waitAndRejectPermit = false
pp.waitAndAllowPermit = false
pp.allowPermit = false
}
// NewPermitPlugin is the factory for permit plugin.
@ -1382,3 +1388,92 @@ func TestFilterPlugin(t *testing.T) {
cleanupPods(cs, t, []*v1.Pod{pod})
}
}
// TestPreemptWithPermitPlugin tests preempt with permit plugins.
func TestPreemptWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
registry := framework.Registry{permitPluginName: NewPermitPlugin}
// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
// Set permit plugin config for testing
pluginConfig := []schedulerconfig.PluginConfig{
{
Name: permitPluginName,
Args: runtime.Unknown{},
},
}
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerWithOptions(t,
initTestMaster(t, "preempt-with-permit-plugin", nil),
false, nil, registry, plugins, pluginConfig, false, time.Second)
defer cleanupTest(t, context)
cs := context.clientSet
// Add one node.
nodeRes := &v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
}
_, err := createNodes(cs, "test-node", nodeRes, 1)
if err != nil {
t.Fatalf("Cannot create nodes: %v", err)
}
perPlugin.failPermit = false
perPlugin.rejectPermit = false
perPlugin.timeoutPermit = false
perPlugin.waitAndRejectPermit = false
perPlugin.waitAndAllowPermit = true
perPlugin.allowPermit = true
lowPriority, highPriority := int32(100), int32(300)
resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)},
}
// Create two pods.
waitingPod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "waiting-pod", Namespace: context.ns.Name, Priority: &lowPriority, Resources: &resourceRequest}))
if err != nil {
t.Errorf("Error while creating the waiting pod: %v", err)
}
// Wait until the waiting-pod is actually waiting, then create a preemptor pod to preempt it.
wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
w := false
perPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
return w, nil
})
preemptorPod, err := createPausePod(cs,
initPausePod(cs, &pausePodConfig{Name: "preemptor-pod", Namespace: context.ns.Name, Priority: &highPriority, Resources: &resourceRequest}))
if err != nil {
t.Errorf("Error while creating the preemptor pod: %v", err)
}
if err = waitForPodToSchedule(cs, preemptorPod); err != nil {
t.Errorf("Expected the preemptor pod to be scheduled. error: %v", err)
}
if _, err := getPod(cs, waitingPod.Name, waitingPod.Namespace); err == nil {
t.Error("Expected the waiting pod to get preempted and deleted")
}
if perPlugin.numPermitCalled == 0 {
t.Errorf("Expected the permit plugin to be called.")
}
perPlugin.reset()
cleanupPods(cs, t, []*v1.Pod{waitingPod, preemptorPod})
}