From 81b705960f24416e8d560f05b3cc24a6bf370e56 Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Fri, 25 Oct 2019 14:46:50 +0800 Subject: [PATCH] Cancel context to make sure all plugins are cancelled when each schedule finishes --- pkg/scheduler/scheduler.go | 28 ++-- test/integration/scheduler/framework_test.go | 133 +++++++++++-------- 2 files changed, 95 insertions(+), 66 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index cf92a9a68a4..1d34a862a03 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -614,7 +614,9 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() - scheduleResult, err := sched.Algorithm.Schedule(ctx, state, pod) + schedulingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() + scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod) if err != nil { sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error()) // Schedule() may have failed because the pod would not fit on any host, so we try to @@ -627,7 +629,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { " No preemption is performed.") } else { preemptionStartTime := time.Now() - sched.preempt(ctx, state, fwk, pod, fitError) + sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime)) @@ -667,7 +669,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } // Run "reserve" plugins. - if sts := fwk.RunReservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { + if sts := fwk.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { sched.recordSchedulingFailure(assumedPodInfo, sts.AsError(), SchedulerError, sts.Message()) metrics.PodScheduleErrors.Inc() return @@ -684,11 +686,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err)) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { + bindingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() @@ -699,13 +703,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { sched.recordSchedulingFailure(assumedPodInfo, err, "VolumeBindingFailed", err.Error()) metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } } // Run "permit" plugins. - permitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !permitStatus.IsSuccess() { var reason string if permitStatus.IsUnschedulable() { @@ -719,13 +723,13 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message()) return } // Run "prebind" plugins. - preBindStatus := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { var reason string metrics.PodScheduleErrors.Inc() @@ -734,18 +738,18 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) } // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message()) return } - err := sched.bind(ctx, assumedPod, scheduleResult.SuggestedHost, state) + err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state) metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedE2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { metrics.PodScheduleErrors.Inc() // trigger un-reserve plugins to clean up state associated with the reserved Pod - fwk.RunUnreservePlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) sched.recordSchedulingFailure(assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err)) } else { // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. @@ -758,7 +762,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // Run "postbind" plugins. - fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) + fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() } diff --git a/test/integration/scheduler/framework_test.go b/test/integration/scheduler/framework_test.go index 5d28870a8de..58c68b1e19e 100644 --- a/test/integration/scheduler/framework_test.go +++ b/test/integration/scheduler/framework_test.go @@ -101,6 +101,7 @@ type PermitPlugin struct { waitAndRejectPermit bool waitAndAllowPermit bool allowPermit bool + cancelled bool fh framework.FrameworkHandle } @@ -395,8 +396,15 @@ func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0 } if pp.timeoutPermit { + go func() { + select { + case <-ctx.Done(): + pp.cancelled = true + } + }() return framework.NewStatus(framework.Wait, ""), 3 * time.Second } + if pp.allowPermit && pod.Name != "waiting-pod" { return nil, 0 } @@ -429,6 +437,11 @@ func (pp *PermitPlugin) allowAllPods() { pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) }) } +// rejectAllPods rejects all waiting pods. +func (pp *PermitPlugin) rejectAllPods() { + pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject("rejectAllPods") }) +} + // reset used to reset permit plugin. func (pp *PermitPlugin) reset() { pp.numPermitCalled = 0 @@ -438,6 +451,7 @@ func (pp *PermitPlugin) reset() { pp.waitAndRejectPermit = false pp.waitAndAllowPermit = false pp.allowPermit = false + pp.cancelled = false } // newPermitPlugin returns a factory for permit plugin with specified PermitPlugin. @@ -1079,18 +1093,7 @@ func TestPostBindPlugin(t *testing.T) { func TestPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. perPlugin := &PermitPlugin{name: permitPluginName} - registry := framework.Registry{permitPluginName: newPermitPlugin(perPlugin)} - - // Setup initial permit plugin for testing. - plugins := &schedulerconfig.Plugins{ - Permit: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: permitPluginName, - }, - }, - }, - } + registry, plugins := initRegistryAndConfig(perPlugin) // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2, @@ -1178,24 +1181,7 @@ func TestMultiplePermitPlugins(t *testing.T) { // Create a plugin registry for testing. perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} - registry := framework.Registry{ - perPlugin1.Name(): newPermitPlugin(perPlugin1), - perPlugin2.Name(): newPermitPlugin(perPlugin2), - } - - // Setup initial permit plugins for testing. - plugins := &schedulerconfig.Plugins{ - Permit: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: perPlugin1.Name(), - }, - { - Name: perPlugin2.Name(), - }, - }, - }, - } + registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2) // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "multi-permit-plugin", nil), 2, @@ -1245,22 +1231,53 @@ func TestMultiplePermitPlugins(t *testing.T) { cleanupPods(context.clientSet, t, []*v1.Pod{pod}) } +// TestPermitPluginsCancelled tests whether all permit plugins are cancelled when pod is rejected. +func TestPermitPluginsCancelled(t *testing.T) { + // Create a plugin registry for testing. + perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} + perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} + registry, plugins := initRegistryAndConfig(perPlugin1, perPlugin2) + + // Create the master and the scheduler with the test plugin set. + context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugins", nil), 2, + scheduler.WithFrameworkPlugins(plugins), + scheduler.WithFrameworkDefaultRegistry(registry)) + defer cleanupTest(t, context) + + // Both permit plugins will return Wait for permitting + perPlugin1.timeoutPermit = true + perPlugin2.timeoutPermit = true + + // Create a test pod. + podName := "test-pod" + pod, err := createPausePod(context.clientSet, + initPausePod(context.clientSet, &pausePodConfig{Name: podName, Namespace: context.ns.Name})) + if err != nil { + t.Errorf("Error while creating a test pod: %v", err) + } + + var waitingPod framework.WaitingPod + // Wait until the test pod is actually waiting. + wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { + waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID) + return waitingPod != nil, nil + }) + + perPlugin1.rejectAllPods() + // Wait some time for the permit plugins to be cancelled + err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { + return perPlugin1.cancelled && perPlugin2.cancelled, nil + }) + if err != nil { + t.Errorf("Expected all permit plugins to be cancelled") + } +} + // TestCoSchedulingWithPermitPlugin tests invocation of permit plugins. func TestCoSchedulingWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. permitPlugin := &PermitPlugin{name: permitPluginName} - registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)} - - // Setup initial permit plugin for testing. - plugins := &schedulerconfig.Plugins{ - Permit: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: permitPluginName, - }, - }, - }, - } + registry, plugins := initRegistryAndConfig(permitPlugin) // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "permit-plugin", nil), 2, @@ -1432,18 +1449,7 @@ func TestPostFilterPlugin(t *testing.T) { func TestPreemptWithPermitPlugin(t *testing.T) { // Create a plugin registry for testing. Register only a permit plugin. permitPlugin := &PermitPlugin{} - registry := framework.Registry{permitPluginName: newPermitPlugin(permitPlugin)} - - // Setup initial permit plugin for testing. - plugins := &schedulerconfig.Plugins{ - Permit: &schedulerconfig.PluginSet{ - Enabled: []schedulerconfig.Plugin{ - { - Name: permitPluginName, - }, - }, - }, - } + registry, plugins := initRegistryAndConfig(permitPlugin) // Create the master and the scheduler with the test plugin set. context := initTestSchedulerForFrameworkTest(t, initTestMaster(t, "preempt-with-permit-plugin", nil), 0, @@ -1522,3 +1528,22 @@ func initTestSchedulerForFrameworkTest(t *testing.T, context *testContext, nodeC } return c } + +// initRegistryAndConfig returns registry and plugins config based on give plugins. +// TODO: refactor it to a more generic functions that accepts all kinds of Plugins as arguments +func initRegistryAndConfig(pp ...*PermitPlugin) (registry framework.Registry, plugins *schedulerconfig.Plugins) { + if len(pp) == 0 { + return + } + + registry = framework.Registry{} + plugins = &schedulerconfig.Plugins{ + Permit: &schedulerconfig.PluginSet{}, + } + + for _, p := range pp { + registry.Register(p.Name(), newPermitPlugin(p)) + plugins.Permit.Enabled = append(plugins.Permit.Enabled, schedulerconfig.Plugin{Name: p.Name()}) + } + return +}