Merge pull request #84337 from hex108/cancel_context

Cancel context when RunPermitPlugins finishes
This commit is contained in:
Kubernetes Prow Robot 2019-11-07 09:04:10 -08:00 committed by GitHub
commit d22e04c181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 66 deletions

View File

@ -613,7 +613,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
// Synchronously attempt to find a fit for the pod.
start := time.Now()
state := framework.NewCycleState()
scheduleResult, err := sched.Algorithm.Schedule(ctx, state, pod)
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
if err != nil {
sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
// Schedule() may have failed because the pod would not fit on any host, so we try to
@ -626,7 +628,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
" No preemption is performed.")
} else {
preemptionStartTime := time.Now()
sched.preempt(ctx, state, fwk, pod, fitError)
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
@ -666,7 +668,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
}
// Run "reserve" plugins.
if sts := fwk.RunReservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
metrics.PodScheduleErrors.Inc()
return
@ -683,11 +685,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
@ -698,13 +702,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error())
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
}
// Run "permit" plugins.
permitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !permitStatus.IsSuccess() {
var reason string
if permitStatus.IsUnschedulable() {
@ -718,13 +722,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message())
return
}
// Run "prebind" plugins.
preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
var reason string
metrics.PodScheduleErrors.Inc()
@ -733,18 +737,18 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
return
}
err := sched.bind(ctx, assumedPod, scheduleResult.SuggestedHost, state)
err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
metrics.PodScheduleErrors.Inc()
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
} else {
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
@ -757,7 +761,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
}
}()
}

View File

@ -101,6 +101,7 @@ type PermitPlugin struct {
waitAndRejectPermit bool
waitAndAllowPermit bool
allowPermit bool
cancelled bool
fh framework.FrameworkHandle
}
@ -395,8 +396,15 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState,
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
}
if pp.timeoutPermit {
go func() {
select {
case <-ctx.Done():
pp.cancelled = true
}
}()
return framework.NewStatus(framework.Wait, ""), 3 * time.Second
}
if pp.allowPermit && pod.Name != "waiting-pod" {
return nil, 0
}
@ -429,6 +437,11 @@ func (pp *PermitPlugin) allowAllPods() {
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) })
}
// rejectAllPods rejects all waiting pods.
func (pp *PermitPlugin) rejectAllPods() {
pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject("rejectAllPods") })
}
// reset used to reset permit plugin.
func (pp *PermitPlugin) reset() {
pp.numPermitCalled = 0
@ -438,6 +451,7 @@ func (pp *PermitPlugin) reset() {
pp.waitAndRejectPermit = false
pp.waitAndAllowPermit = false
pp.allowPermit = false
pp.cancelled = false
}
// newPermitPlugin returns a factory for permit plugin with specified PermitPlugin.
@ -1079,18 +1093,7 @@ func TestPostBindPlugin(t *testing.T) {
func TestPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
perPlugin := &PermitPlugin{name: permitPluginName}
registry := framework.Registry{permitPluginName: newPermitPlugin(perPlugin)}
// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
registry, plugins := initRegistryAndConfig(perPlugin)
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
@ -1178,24 +1181,7 @@ func TestMultiplePermitPlugins(t *testing.T) {
// Create a plugin registry for testing.
perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
registry := framework.Registry{
perPlugin1.Name(): newPermitPlugin(perPlugin1),
perPlugin2.Name(): newPermitPlugin(perPlugin2),
}
// Setup initial permit plugins for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: perPlugin1.Name(),
},
{
Name: perPlugin2.Name(),
},
},
},
}
registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2)
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2,
@ -1245,22 +1231,53 @@ func TestMultiplePermitPlugins(t *testing.T) {
cleanupPods(context.clientSet, t, []*v1.Pod{pod})
}
// TestPermitPluginsCancelled tests whether all permit plugins are cancelled when pod is rejected.
func TestPermitPluginsCancelled(t *testing.T) {
// Create a plugin registry for testing.
perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2)
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugins", nil), 2,
scheduler.WithFrameworkPlugins(plugins),
scheduler.WithFrameworkDefaultRegistry(registry))
defer cleanupTest(t, context)
// Both permit plugins will return Wait for permitting
perPlugin1.timeoutPermit = true
perPlugin2.timeoutPermit = true
// Create a test pod.
podName := "test-pod"
pod, err := createPausePod(context.clientSet,
initPausePod(context.clientSet, &pausePodConfig{Name: podName, Namespace: context.ns.Name}))
if err != nil {
t.Errorf("Error while creating a test pod: %v", err)
}
var waitingPod framework.WaitingPod
// Wait until the test pod is actually waiting.
wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID)
return waitingPod != nil, nil
})
perPlugin1.rejectAllPods()
// Wait some time for the permit plugins to be cancelled
err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
return perPlugin1.cancelled && perPlugin2.cancelled, nil
})
if err != nil {
t.Errorf("Expected all permit plugins to be cancelled")
}
}
// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins.
func TestCoSchedulingWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
permitPlugin := &PermitPlugin{name: permitPluginName}
registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)}
// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
registry, plugins := initRegistryAndConfig(permitPlugin)
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2,
@ -1432,18 +1449,7 @@ func TestPostFilterPlugin(t *testing.T) {
func TestPreemptWithPermitPlugin(t *testing.T) {
// Create a plugin registry for testing. Register only a permit plugin.
permitPlugin := &PermitPlugin{}
registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)}
// Setup initial permit plugin for testing.
plugins := &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: permitPluginName,
},
},
},
}
registry, plugins := initRegistryAndConfig(permitPlugin)
// Create the master and the scheduler with the test plugin set.
context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0,
@ -1522,3 +1528,22 @@ func initTestSchedulerForFrameworkTest(t *testing.T, context *testContext, nodeC
}
return c
}
// initRegistryAndConfig returns registry and plugins config based on give plugins.
// TODO: refactor it to a more generic functions that accepts all kinds of Plugins as arguments
func initRegistryAndConfig(pp ...*PermitPlugin) (registry framework.Registry, plugins *schedulerconfig.Plugins) {
if len(pp) == 0 {
return
}
registry = framework.Registry{}
plugins = &schedulerconfig.Plugins{
Permit: &schedulerconfig.PluginSet{},
}
for _, p := range pp {
registry.Register(p.Name(), newPermitPlugin(p))
plugins.Permit.Enabled = append(plugins.Permit.Enabled, schedulerconfig.Plugin{Name: p.Name()})
}
return
}