Merge pull request #88199 from mateuszlitwin/run-permit-plugins-in-scheduling-cycle

run permit plugins in the scheduling cycle
This commit is contained in:
Kubernetes Prow Robot 2020-02-18 11:24:25 -08:00 committed by GitHub
commit d5e0a941aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 155 additions and 76 deletions

View File

@ -735,10 +735,9 @@ func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin,
// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will block for the timeout period
// returned by the plugin, if the time expires, then it will return an error.
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
// plugins returns "Wait", then this function will create and add waiting pod
// to a map of currently waiting pods and return status with "Wait" code.
// Pod will remain waiting pod for the minimum duration returned by the permit plugins.
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() {
@ -750,7 +749,7 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
@ -768,29 +767,13 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod
}
}
}
// We now wait for the minimum duration if at least one plugin asked to
// wait (and no plugin rejected the pod)
if statusCode == Wait {
startTime := time.Now()
w := newWaitingPod(pod, pluginsWaitTime)
f.waitingPods.add(w)
defer f.waitingPods.remove(pod.UID)
klog.V(4).Infof("waiting for pod %q at permit", pod.Name)
s := <-w.s
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() {
if s.IsUnschedulable() {
msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
waitingPod := newWaitingPod(pod, pluginsWaitTime)
f.waitingPods.add(waitingPod)
msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
klog.V(4).Infof(msg)
return NewStatus(Wait, msg)
}
return nil
}
@ -804,6 +787,32 @@ func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state
return status, timeout
}
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) {
waitingPod := f.waitingPods.get(pod.UID)
if waitingPod == nil {
return nil
}
defer f.waitingPods.remove(pod.UID)
klog.V(4).Infof("pod %q waiting on permit", pod.Name)
startTime := time.Now()
s := <-waitingPod.s
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() {
if s.IsUnschedulable() {
msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
return nil
}
// SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
// unchanged until a pod finishes "Reserve". There is no guarantee that the information
@ -819,7 +828,10 @@ func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
// GetWaitingPod returns a reference to a WaitingPod given its UID.
func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
return f.waitingPods.get(uid)
if wp := f.waitingPods.get(uid); wp != nil {
return wp
}
return nil // Returning nil instead of *waitingPod(nil).
}
// RejectWaitingPod rejects a WaitingPod given its UID.

View File

@ -1217,7 +1217,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(Unschedulable)},
},
},
want: NewStatus(Unschedulable, `rejected by "TestPlugin" at permit: injected status`),
want: NewStatus(Unschedulable, `rejected pod "" by permit plugin "TestPlugin": injected status`),
},
{
name: "ErrorPermitPlugin",
@ -1237,7 +1237,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(UnschedulableAndUnresolvable)},
},
},
want: NewStatus(UnschedulableAndUnresolvable, `rejected by "TestPlugin" at permit: injected status`),
want: NewStatus(UnschedulableAndUnresolvable, `rejected pod "" by permit plugin "TestPlugin": injected status`),
},
{
name: "WaitPermitPlugin",
@ -1247,7 +1247,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(Wait)},
},
},
want: NewStatus(Unschedulable, `pod "" rejected while waiting at permit: rejected due to timeout after waiting 0s at plugin TestPlugin`),
want: NewStatus(Wait, `one or more plugins asked to wait and no plugin rejected pod ""`),
},
{
name: "SuccessSuccessPermitPlugin",
@ -1425,6 +1425,13 @@ func TestRecordingMetrics(t *testing.T) {
wantExtensionPoint: "Permit",
wantStatus: Error,
},
{
name: "Permit - Wait",
action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") },
inject: injectedResult{PermitStatus: int(Wait)},
wantExtensionPoint: "Permit",
wantStatus: Wait,
},
}
for _, tt := range tests {
@ -1578,17 +1585,17 @@ func TestRunBindPlugins(t *testing.T) {
}
}
func TestPermitWaitingMetric(t *testing.T) {
func TestPermitWaitDurationMetric(t *testing.T) {
tests := []struct {
name string
inject injectedResult
wantRes string
}{
{
name: "Permit - Success",
name: "WaitOnPermit - No Wait",
},
{
name: "Permit - Wait Timeout",
name: "WaitOnPermit - Wait Timeout",
inject: injectedResult{PermitStatus: int(Wait)},
wantRes: "Unschedulable",
},
@ -1617,13 +1624,14 @@ func TestPermitWaitingMetric(t *testing.T) {
}
f.RunPermitPlugins(context.TODO(), nil, pod, "")
f.WaitOnPermit(context.TODO(), pod)
collectAndComparePermitWaitDuration(t, tt.wantRes)
})
}
}
func TestRejectWaitingPod(t *testing.T) {
func TestWaitOnPermit(t *testing.T) {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
@ -1631,33 +1639,65 @@ func TestRejectWaitingPod(t *testing.T) {
},
}
testPermitPlugin := &TestPermitPlugin{}
r := make(Registry)
r.Register(permitPlugin,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return testPermitPlugin, nil
})
plugins := &config.Plugins{
Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}},
tests := []struct {
name string
action func(f Framework)
wantStatus Code
wantMessage string
}{
{
name: "Reject Waiting Pod",
action: func(f Framework) {
f.GetWaitingPod(pod.UID).Reject("reject message")
},
wantStatus: Unschedulable,
wantMessage: "pod \"pod\" rejected while waiting on permit: reject message",
},
{
name: "Allow Waiting Pod",
action: func(f Framework) {
f.GetWaitingPod(pod.UID).Allow(permitPlugin)
},
wantStatus: Success,
wantMessage: "",
},
}
f, err := newFrameworkWithQueueSortAndBind(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
go func() {
for {
waitingPod := f.GetWaitingPod(pod.UID)
if waitingPod != nil {
break
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testPermitPlugin := &TestPermitPlugin{}
r := make(Registry)
r.Register(permitPlugin,
func(_ *runtime.Unknown, fh FrameworkHandle) (Plugin, error) {
return testPermitPlugin, nil
})
plugins := &config.Plugins{
Permit: &config.PluginSet{Enabled: []config.Plugin{{Name: permitPlugin, Weight: 1}}},
}
}
f.RejectWaitingPod(pod.UID)
}()
permitStatus := f.RunPermitPlugins(context.Background(), nil, pod, "")
if permitStatus.Message() != "pod \"pod\" rejected while waiting at permit: removed" {
t.Fatalf("RejectWaitingPod failed, permitStatus: %v", permitStatus)
f, err := newFrameworkWithQueueSortAndBind(r, plugins, emptyArgs)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
runPermitPluginsStatus := f.RunPermitPlugins(context.Background(), nil, pod, "")
if runPermitPluginsStatus.Code() != Wait {
t.Fatalf("Expected RunPermitPlugins to return status %v, but got %v",
Wait, runPermitPluginsStatus.Code())
}
go tt.action(f)
waitOnPermitStatus := f.WaitOnPermit(context.Background(), pod)
if waitOnPermitStatus.Code() != tt.wantStatus {
t.Fatalf("Expected WaitOnPermit to return status %v, but got %v",
tt.wantStatus, waitOnPermitStatus.Code())
}
if waitOnPermitStatus.Message() != tt.wantMessage {
t.Fatalf("Expected WaitOnPermit to return status with message %q, but got %q",
tt.wantMessage, waitOnPermitStatus.Message())
}
})
}
}

View File

@ -468,12 +468,14 @@ type Framework interface {
// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will block for the timeout period
// returned by the plugin, if the time expires, then it will return an error.
// Note that if multiple plugins asked to wait, then we wait for the minimum
// timeout duration.
// plugins returns "Wait", then this function will create and add waiting pod
// to a map of currently waiting pods and return status with "Wait" code.
// Pod will remain waiting pod for the minimum duration returned by the permit plugins.
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
// RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose
// whether or not to handle the given Pod. If a bind plugin chooses to skip the
// binding, it should return code=5("skip") status. Otherwise, it should return "Error"
@ -497,9 +499,9 @@ type Framework interface {
type FrameworkHandle interface {
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Reserve" point. There is no guarantee that the information
// a pod finishes "Permit" point. There is no guarantee that the information
// remains unchanged in the binding phase of scheduling, so plugins in the binding
// cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it,
// cycle (pre-bind/bind/post-bind/un-reserve plugin) should not use it,
// otherwise a concurrent read/write error might occur, they should use scheduler
// cache instead.
SnapshotSharedLister() schedulerlisters.SharedLister

View File

@ -27,19 +27,19 @@ import (
// waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase.
type waitingPodsMap struct {
pods map[types.UID]WaitingPod
pods map[types.UID]*waitingPod
mu sync.RWMutex
}
// newWaitingPodsMap returns a new waitingPodsMap.
func newWaitingPodsMap() *waitingPodsMap {
return &waitingPodsMap{
pods: make(map[types.UID]WaitingPod),
pods: make(map[types.UID]*waitingPod),
}
}
// add a new WaitingPod to the map.
func (m *waitingPodsMap) add(wp WaitingPod) {
func (m *waitingPodsMap) add(wp *waitingPod) {
m.mu.Lock()
defer m.mu.Unlock()
m.pods[wp.GetPod().UID] = wp
@ -53,11 +53,10 @@ func (m *waitingPodsMap) remove(uid types.UID) {
}
// get a WaitingPod from the map.
func (m *waitingPodsMap) get(uid types.UID) WaitingPod {
func (m *waitingPodsMap) get(uid types.UID) *waitingPod {
m.mu.RLock()
defer m.mu.RUnlock()
return m.pods[uid]
}
// iterate acquires a read lock and iterates over the WaitingPods map.
@ -77,11 +76,17 @@ type waitingPod struct {
mu sync.RWMutex
}
var _ WaitingPod = &waitingPod{}
// newWaitingPod returns a new waitingPod instance.
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
wp := &waitingPod{
pod: pod,
s: make(chan *Status),
// Allow() and Reject() calls are non-blocking. This property is guaranteed
// by using non-blocking send to this channel. This channel has a buffer of size 1
// to ensure that non-blocking send will not be ignored - possible situation when
// receiving from this channel happens after non-blocking send.
s: make(chan *Status, 1),
}
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))

View File

@ -218,7 +218,7 @@ var (
&metrics.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "permit_wait_duration_seconds",
Help: "Duration of waiting in RunPermitPlugins.",
Help: "Duration of waiting on permit.",
Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
StabilityLevel: metrics.ALPHA,
},

View File

@ -639,6 +639,27 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return
}
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
var reason string
if runPermitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// One of the plugins returned status different than success or wait.
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
@ -646,11 +667,10 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
// Run "permit" plugins.
permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if !permitStatus.IsSuccess() {
waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
if !waitOnPermitStatus.IsSuccess() {
var reason string
if permitStatus.IsUnschedulable() {
if waitOnPermitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
@ -662,7 +682,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
}
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message())
sched.recordSchedulingFailure(assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
return
}