From d221d82eaf3756a759530bdd332af9ba755151dd Mon Sep 17 00:00:00 2001 From: Mateusz Date: Sat, 15 Feb 2020 16:28:43 -0800 Subject: [PATCH] run permit plugins in the scheduling cycle --- pkg/scheduler/framework/v1alpha1/framework.go | 66 +++++++----- .../framework/v1alpha1/framework_test.go | 102 ++++++++++++------ pkg/scheduler/framework/v1alpha1/interface.go | 14 +-- .../framework/v1alpha1/waiting_pods_map.go | 17 +-- pkg/scheduler/metrics/metrics.go | 2 +- pkg/scheduler/scheduler.go | 30 +++++- 6 files changed, 155 insertions(+), 76 deletions(-) diff --git a/pkg/scheduler/framework/v1alpha1/framework.go b/pkg/scheduler/framework/v1alpha1/framework.go index 848893ffbdd..6eb24824afd 100644 --- a/pkg/scheduler/framework/v1alpha1/framework.go +++ b/pkg/scheduler/framework/v1alpha1/framework.go @@ -735,10 +735,9 @@ func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin, // RunPermitPlugins runs the set of configured permit plugins. If any of these // plugins returns a status other than "Success" or "Wait", it does not continue // running the remaining plugins and returns an error. Otherwise, if any of the -// plugins returns "Wait", then this function will block for the timeout period -// returned by the plugin, if the time expires, then it will return an error. -// Note that if multiple plugins asked to wait, then we wait for the minimum -// timeout duration. +// plugins returns "Wait", then this function will create and add waiting pod +// to a map of currently waiting pods and return status with "Wait" code. +// Pod will remain waiting pod for the minimum duration returned by the permit plugins. func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { startTime := time.Now() defer func() { @@ -750,7 +749,7 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() { - msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message()) + msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message()) klog.V(4).Infof(msg) return NewStatus(status.Code(), msg) } @@ -768,29 +767,13 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod } } } - - // We now wait for the minimum duration if at least one plugin asked to - // wait (and no plugin rejected the pod) if statusCode == Wait { - startTime := time.Now() - w := newWaitingPod(pod, pluginsWaitTime) - f.waitingPods.add(w) - defer f.waitingPods.remove(pod.UID) - klog.V(4).Infof("waiting for pod %q at permit", pod.Name) - s := <-w.s - metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime)) - if !s.IsSuccess() { - if s.IsUnschedulable() { - msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message()) - klog.V(4).Infof(msg) - return NewStatus(s.Code(), msg) - } - msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message()) - klog.Error(msg) - return NewStatus(Error, msg) - } + waitingPod := newWaitingPod(pod, pluginsWaitTime) + f.waitingPods.add(waitingPod) + msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name) + klog.V(4).Infof(msg) + return NewStatus(Wait, msg) } - return nil } @@ -804,6 +787,32 @@ func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state return status, timeout } +// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed. +func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) { + waitingPod := f.waitingPods.get(pod.UID) + if waitingPod == nil { + return nil + } + defer f.waitingPods.remove(pod.UID) + klog.V(4).Infof("pod %q waiting on permit", pod.Name) + + startTime := time.Now() + s := <-waitingPod.s + metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime)) + + if !s.IsSuccess() { + if s.IsUnschedulable() { + msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message()) + klog.V(4).Infof(msg) + return NewStatus(s.Code(), msg) + } + msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message()) + klog.Error(msg) + return NewStatus(Error, msg) + } + return nil +} + // SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo // snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains // unchanged until a pod finishes "Reserve". There is no guarantee that the information @@ -819,7 +828,10 @@ func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) { // GetWaitingPod returns a reference to a WaitingPod given its UID. func (f *framework) GetWaitingPod(uid types.UID) WaitingPod { - return f.waitingPods.get(uid) + if wp := f.waitingPods.get(uid); wp != nil { + return wp + } + return nil // Returning nil instead of *waitingPod(nil). } // RejectWaitingPod rejects a WaitingPod given its UID. diff --git a/pkg/scheduler/framework/v1alpha1/framework_test.go b/pkg/scheduler/framework/v1alpha1/framework_test.go index 76990a232d7..440760a62c4 100644 --- a/pkg/scheduler/framework/v1alpha1/framework_test.go +++ b/pkg/scheduler/framework/v1alpha1/framework_test.go @@ -1217,7 +1217,7 @@ func TestPermitPlugins(t *testing.T) { inj: injectedResult{PermitStatus: int(Unschedulable)}, }, }, - want: NewStatus(Unschedulable, `rejected by "TestPlugin" at permit: injected status`), + want: NewStatus(Unschedulable, `rejected pod "" by permit plugin "TestPlugin": injected status`), }, { name: "ErrorPermitPlugin", @@ -1237,7 +1237,7 @@ func TestPermitPlugins(t *testing.T) { inj: injectedResult{PermitStatus: int(UnschedulableAndUnresolvable)}, }, }, - want: NewStatus(UnschedulableAndUnresolvable, `rejected by "TestPlugin" at permit: injected status`), + want: NewStatus(UnschedulableAndUnresolvable, `rejected pod "" by permit plugin "TestPlugin": injected status`), }, { name: "WaitPermitPlugin", @@ -1247,7 +1247,7 @@ func TestPermitPlugins(t *testing.T) { inj: injectedResult{PermitStatus: int(Wait)}, }, }, - want: NewStatus(Unschedulable, `pod "" rejected while waiting at permit: rejected due to timeout after waiting 0s at plugin TestPlugin`), + want: NewStatus(Wait, `one or more plugins asked to wait and no plugin rejected pod ""`), }, { name: "SuccessSuccessPermitPlugin", @@ -1425,6 +1425,13 @@ func TestRecordingMetrics(t *testing.T) { wantExtensionPoint: "Permit", wantStatus: Error, }, + { + name: "Permit - Wait", + action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") }, + inject: injectedResult{PermitStatus: int(Wait)}, + wantExtensionPoint: "Permit", + wantStatus: Wait, + }, } for _, tt := range tests { @@ -1578,17 +1585,17 @@ func TestRunBindPlugins(t *testing.T) { } } -func TestPermitWaitingMetric(t *testing.T) { +func TestPermitWaitDurationMetric(t *testing.T) { tests := []struct { name string inject injectedResult wantRes string }{ { - name: "Permit - Success", + name: "WaitOnPermit - No Wait", }, { - name: "Permit - Wait Timeout", + name: "WaitOnPermit - Wait Timeout", inject: injectedResult{PermitStatus: int(Wait)}, wantRes: "Unschedulable", }, @@ -1617,13 +1624,14 @@ func TestPermitWaitingMetric(t *testing.T) { } f.RunPermitPlugins(context.TODO(), nil, pod, "") + f.WaitOnPermit(context.TODO(), pod) collectAndComparePermitWaitDuration(t, tt.wantRes) }) } } -func TestRejectWaitingPod(t *testing.T) { +func TestWaitOnPermit(t *testing.T) { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "pod", @@ -1631,33 +1639,65 @@ func TestRejectWaitingPod(t *testing.T) { }, } - testPermitPlugin := &TestPermitPlugin{} - r := make(Registry) - r.Register(permitPlugin, - func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { - return testPermitPlugin, nil - }) - plugins := &config.Plugins{ - Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}}, + tests := []struct { + name string + action func(f Framework) + wantStatus Code + wantMessage string + }{ + { + name: "Reject Waiting Pod", + action: func(f Framework) { + f.GetWaitingPod(pod.UID).Reject("reject message") + }, + wantStatus: Unschedulable, + wantMessage: "pod \"pod\" rejected while waiting on permit: reject message", + }, + { + name: "Allow Waiting Pod", + action: func(f Framework) { + f.GetWaitingPod(pod.UID).Allow(permitPlugin) + }, + wantStatus: Success, + wantMessage: "", + }, } - f, err := newFrameworkWithQueueSortAndBind(r, plugins, emptyArgs) - if err != nil { - t.Fatalf("Failed to create framework for testing: %v", err) - } - - go func() { - for { - waitingPod := f.GetWaitingPod(pod.UID) - if waitingPod != nil { - break + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testPermitPlugin := &TestPermitPlugin{} + r := make(Registry) + r.Register(permitPlugin, + func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) { + return testPermitPlugin, nil + }) + plugins := &config.Plugins{ + Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}}, } - } - f.RejectWaitingPod(pod.UID) - }() - permitStatus := f.RunPermitPlugins(context.Background(), nil, pod, "") - if permitStatus.Message() != "pod \"pod\" rejected while waiting at permit: removed" { - t.Fatalf("RejectWaitingPod failed, permitStatus: %v", permitStatus) + + f, err := newFrameworkWithQueueSortAndBind(r, plugins, emptyArgs) + if err != nil { + t.Fatalf("Failed to create framework for testing: %v", err) + } + + runPermitPluginsStatus := f.RunPermitPlugins(context.Background(), nil, pod, "") + if runPermitPluginsStatus.Code() != Wait { + t.Fatalf("Expected RunPermitPlugins to return status %v, but got %v", + Wait, runPermitPluginsStatus.Code()) + } + + go tt.action(f) + + waitOnPermitStatus := f.WaitOnPermit(context.Background(), pod) + if waitOnPermitStatus.Code() != tt.wantStatus { + t.Fatalf("Expected WaitOnPermit to return status %v, but got %v", + tt.wantStatus, waitOnPermitStatus.Code()) + } + if waitOnPermitStatus.Message() != tt.wantMessage { + t.Fatalf("Expected WaitOnPermit to return status with message %q, but got %q", + tt.wantMessage, waitOnPermitStatus.Message()) + } + }) } } diff --git a/pkg/scheduler/framework/v1alpha1/interface.go b/pkg/scheduler/framework/v1alpha1/interface.go index e7a2c05cceb..6b9b3ca9578 100644 --- a/pkg/scheduler/framework/v1alpha1/interface.go +++ b/pkg/scheduler/framework/v1alpha1/interface.go @@ -468,12 +468,14 @@ type Framework interface { // RunPermitPlugins runs the set of configured permit plugins. If any of these // plugins returns a status other than "Success" or "Wait", it does not continue // running the remaining plugins and returns an error. Otherwise, if any of the - // plugins returns "Wait", then this function will block for the timeout period - // returned by the plugin, if the time expires, then it will return an error. - // Note that if multiple plugins asked to wait, then we wait for the minimum - // timeout duration. + // plugins returns "Wait", then this function will create and add waiting pod + // to a map of currently waiting pods and return status with "Wait" code. + // Pod will remain waiting pod for the minimum duration returned by the permit plugins. RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status + // WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed. + WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status + // RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose // whether or not to handle the given Pod. If a bind plugin chooses to skip the // binding, it should return code=5("skip") status. Otherwise, it should return "Error" @@ -497,9 +499,9 @@ type Framework interface { type FrameworkHandle interface { // SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot // is taken at the beginning of a scheduling cycle and remains unchanged until - // a pod finishes "Reserve" point. There is no guarantee that the information + // a pod finishes "Permit" point. There is no guarantee that the information // remains unchanged in the binding phase of scheduling, so plugins in the binding - // cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it, + // cycle (pre-bind/bind/post-bind/un-reserve plugin) should not use it, // otherwise a concurrent read/write error might occur, they should use scheduler // cache instead. SnapshotSharedLister() schedulerlisters.SharedLister diff --git a/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go b/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go index 3eaa0382dc8..28c58c40635 100644 --- a/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go +++ b/pkg/scheduler/framework/v1alpha1/waiting_pods_map.go @@ -27,19 +27,19 @@ import ( // waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase. type waitingPodsMap struct { - pods map[types.UID]WaitingPod + pods map[types.UID]*waitingPod mu sync.RWMutex } // newWaitingPodsMap returns a new waitingPodsMap. func newWaitingPodsMap() *waitingPodsMap { return &waitingPodsMap{ - pods: make(map[types.UID]WaitingPod), + pods: make(map[types.UID]*waitingPod), } } // add a new WaitingPod to the map. -func (m *waitingPodsMap) add(wp WaitingPod) { +func (m *waitingPodsMap) add(wp *waitingPod) { m.mu.Lock() defer m.mu.Unlock() m.pods[wp.GetPod().UID] = wp @@ -53,11 +53,10 @@ func (m *waitingPodsMap) remove(uid types.UID) { } // get a WaitingPod from the map. -func (m *waitingPodsMap) get(uid types.UID) WaitingPod { +func (m *waitingPodsMap) get(uid types.UID) *waitingPod { m.mu.RLock() defer m.mu.RUnlock() return m.pods[uid] - } // iterate acquires a read lock and iterates over the WaitingPods map. @@ -77,11 +76,17 @@ type waitingPod struct { mu sync.RWMutex } +var _ WaitingPod = &waitingPod{} + // newWaitingPod returns a new waitingPod instance. func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod { wp := &waitingPod{ pod: pod, - s: make(chan *Status), + // Allow() and Reject() calls are non-blocking. This property is guaranteed + // by using non-blocking send to this channel. This channel has a buffer of size 1 + // to ensure that non-blocking send will not be ignored - possible situation when + // receiving from this channel happens after non-blocking send. + s: make(chan *Status, 1), } wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime)) diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 095b991ce94..bdcdb6feb05 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -218,7 +218,7 @@ var ( &metrics.HistogramOpts{ Subsystem: SchedulerSubsystem, Name: "permit_wait_duration_seconds", - Help: "Duration of waiting in RunPermitPlugins.", + Help: "Duration of waiting on permit.", Buckets: metrics.ExponentialBuckets(0.001, 2, 15), StabilityLevel: metrics.ALPHA, }, diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 535355426cd..89411a3f940 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -639,6 +639,27 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) return } + + // Run "permit" plugins. + runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() { + var reason string + if runPermitStatus.IsUnschedulable() { + metrics.PodScheduleFailures.Inc() + reason = v1.PodReasonUnschedulable + } else { + metrics.PodScheduleErrors.Inc() + reason = SchedulerError + } + if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil { + klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr) + } + // One of the plugins returned status different than success or wait. + fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) + sched.recordSchedulingFailure(assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message()) + return + } + // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) @@ -646,11 +667,10 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() - // Run "permit" plugins. - permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - if !permitStatus.IsSuccess() { + waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) + if !waitOnPermitStatus.IsSuccess() { var reason string - if permitStatus.IsUnschedulable() { + if waitOnPermitStatus.IsUnschedulable() { metrics.PodScheduleFailures.Inc() reason = v1.PodReasonUnschedulable } else { @@ -662,7 +682,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) { } // trigger un-reserve plugins to clean up state associated with the reserved Pod fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) - sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message()) + sched.recordSchedulingFailure(assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message()) return }