run permit plugins in the scheduling cycle

This commit is contained in:
Mateusz 2020-02-15 16:28:43 -08:00
parent ea5cef1c65
commit d221d82eaf
6 changed files with 155 additions and 76 deletions

View File

@ -735,10 +735,9 @@ func (f *framework) runUnreservePlugin(ctx context.Context, pl UnreservePlugin,
// RunPermitPlugins runs the set of configured permit plugins. If any of these // RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue // plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the // running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will block for the timeout period // plugins returns "Wait", then this function will create and add waiting pod
// returned by the plugin, if the time expires, then it will return an error. // to a map of currently waiting pods and return status with "Wait" code.
// Note that if multiple plugins asked to wait, then we wait for the minimum // Pod will remain waiting pod for the minimum duration returned by the permit plugins.
// timeout duration.
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
@ -750,7 +749,7 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod
status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName) status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
if !status.IsSuccess() { if !status.IsSuccess() {
if status.IsUnschedulable() { if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message()) msg := fmt.Sprintf("rejected pod %q by permit plugin %q: %v", pod.Name, pl.Name(), status.Message())
klog.V(4).Infof(msg) klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg) return NewStatus(status.Code(), msg)
} }
@ -768,29 +767,13 @@ func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod
} }
} }
} }
// We now wait for the minimum duration if at least one plugin asked to
// wait (and no plugin rejected the pod)
if statusCode == Wait { if statusCode == Wait {
startTime := time.Now() waitingPod := newWaitingPod(pod, pluginsWaitTime)
w := newWaitingPod(pod, pluginsWaitTime) f.waitingPods.add(waitingPod)
f.waitingPods.add(w) msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
defer f.waitingPods.remove(pod.UID)
klog.V(4).Infof("waiting for pod %q at permit", pod.Name)
s := <-w.s
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() {
if s.IsUnschedulable() {
msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message())
klog.V(4).Infof(msg) klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg) return NewStatus(Wait, msg)
} }
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}
return nil return nil
} }
@ -804,6 +787,32 @@ func (f *framework) runPermitPlugin(ctx context.Context, pl PermitPlugin, state
return status, timeout return status, timeout
} }
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
func (f *framework) WaitOnPermit(ctx context.Context, pod *v1.Pod) (status *Status) {
waitingPod := f.waitingPods.get(pod.UID)
if waitingPod == nil {
return nil
}
defer f.waitingPods.remove(pod.UID)
klog.V(4).Infof("pod %q waiting on permit", pod.Name)
startTime := time.Now()
s := <-waitingPod.s
metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
if !s.IsSuccess() {
if s.IsUnschedulable() {
msg := fmt.Sprintf("pod %q rejected while waiting on permit: %v", pod.Name, s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting on permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
return nil
}
// SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo // SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains // snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
// unchanged until a pod finishes "Reserve". There is no guarantee that the information // unchanged until a pod finishes "Reserve". There is no guarantee that the information
@ -819,7 +828,10 @@ func (f *framework) IterateOverWaitingPods(callback func(WaitingPod)) {
// GetWaitingPod returns a reference to a WaitingPod given its UID. // GetWaitingPod returns a reference to a WaitingPod given its UID.
func (f *framework) GetWaitingPod(uid types.UID) WaitingPod { func (f *framework) GetWaitingPod(uid types.UID) WaitingPod {
return f.waitingPods.get(uid) if wp := f.waitingPods.get(uid); wp != nil {
return wp
}
return nil // Returning nil instead of *waitingPod(nil).
} }
// RejectWaitingPod rejects a WaitingPod given its UID. // RejectWaitingPod rejects a WaitingPod given its UID.

View File

@ -1217,7 +1217,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(Unschedulable)}, inj: injectedResult{PermitStatus: int(Unschedulable)},
}, },
}, },
want: NewStatus(Unschedulable, `rejected by "TestPlugin" at permit: injected status`), want: NewStatus(Unschedulable, `rejected pod "" by permit plugin "TestPlugin": injected status`),
}, },
{ {
name: "ErrorPermitPlugin", name: "ErrorPermitPlugin",
@ -1237,7 +1237,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(UnschedulableAndUnresolvable)}, inj: injectedResult{PermitStatus: int(UnschedulableAndUnresolvable)},
}, },
}, },
want: NewStatus(UnschedulableAndUnresolvable, `rejected by "TestPlugin" at permit: injected status`), want: NewStatus(UnschedulableAndUnresolvable, `rejected pod "" by permit plugin "TestPlugin": injected status`),
}, },
{ {
name: "WaitPermitPlugin", name: "WaitPermitPlugin",
@ -1247,7 +1247,7 @@ func TestPermitPlugins(t *testing.T) {
inj: injectedResult{PermitStatus: int(Wait)}, inj: injectedResult{PermitStatus: int(Wait)},
}, },
}, },
want: NewStatus(Unschedulable, `pod "" rejected while waiting at permit: rejected due to timeout after waiting 0s at plugin TestPlugin`), want: NewStatus(Wait, `one or more plugins asked to wait and no plugin rejected pod ""`),
}, },
{ {
name: "SuccessSuccessPermitPlugin", name: "SuccessSuccessPermitPlugin",
@ -1425,6 +1425,13 @@ func TestRecordingMetrics(t *testing.T) {
wantExtensionPoint: "Permit", wantExtensionPoint: "Permit",
wantStatus: Error, wantStatus: Error,
}, },
{
name: "Permit - Wait",
action: func(f Framework) { f.RunPermitPlugins(context.Background(), state, pod, "") },
inject: injectedResult{PermitStatus: int(Wait)},
wantExtensionPoint: "Permit",
wantStatus: Wait,
},
} }
for _, tt := range tests { for _, tt := range tests {
@ -1578,17 +1585,17 @@ func TestRunBindPlugins(t *testing.T) {
} }
} }
func TestPermitWaitingMetric(t *testing.T) { func TestPermitWaitDurationMetric(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
inject injectedResult inject injectedResult
wantRes string wantRes string
}{ }{
{ {
name: "Permit - Success", name: "WaitOnPermit - No Wait",
}, },
{ {
name: "Permit - Wait Timeout", name: "WaitOnPermit - Wait Timeout",
inject: injectedResult{PermitStatus: int(Wait)}, inject: injectedResult{PermitStatus: int(Wait)},
wantRes: "Unschedulable", wantRes: "Unschedulable",
}, },
@ -1617,13 +1624,14 @@ func TestPermitWaitingMetric(t *testing.T) {
} }
f.RunPermitPlugins(context.TODO(), nil, pod, "") f.RunPermitPlugins(context.TODO(), nil, pod, "")
f.WaitOnPermit(context.TODO(), pod)
collectAndComparePermitWaitDuration(t, tt.wantRes) collectAndComparePermitWaitDuration(t, tt.wantRes)
}) })
} }
} }
func TestRejectWaitingPod(t *testing.T) { func TestWaitOnPermit(t *testing.T) {
pod := &v1.Pod{ pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "pod", Name: "pod",
@ -1631,6 +1639,32 @@ func TestRejectWaitingPod(t *testing.T) {
}, },
} }
tests := []struct {
name string
action func(f Framework)
wantStatus Code
wantMessage string
}{
{
name: "Reject Waiting Pod",
action: func(f Framework) {
f.GetWaitingPod(pod.UID).Reject("reject message")
},
wantStatus: Unschedulable,
wantMessage: "pod \"pod\" rejected while waiting on permit: reject message",
},
{
name: "Allow Waiting Pod",
action: func(f Framework) {
f.GetWaitingPod(pod.UID).Allow(permitPlugin)
},
wantStatus: Success,
wantMessage: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testPermitPlugin := &TestPermitPlugin{} testPermitPlugin := &TestPermitPlugin{}
r := make(Registry) r := make(Registry)
r.Register(permitPlugin, r.Register(permitPlugin,
@ -1646,18 +1680,24 @@ func TestRejectWaitingPod(t *testing.T) {
t.Fatalf("Failed to create framework for testing: %v", err) t.Fatalf("Failed to create framework for testing: %v", err)
} }
go func() { runPermitPluginsStatus := f.RunPermitPlugins(context.Background(), nil, pod, "")
for { if runPermitPluginsStatus.Code() != Wait {
waitingPod := f.GetWaitingPod(pod.UID) t.Fatalf("Expected RunPermitPlugins to return status %v, but got %v",
if waitingPod != nil { Wait, runPermitPluginsStatus.Code())
break
} }
go tt.action(f)
waitOnPermitStatus := f.WaitOnPermit(context.Background(), pod)
if waitOnPermitStatus.Code() != tt.wantStatus {
t.Fatalf("Expected WaitOnPermit to return status %v, but got %v",
tt.wantStatus, waitOnPermitStatus.Code())
} }
f.RejectWaitingPod(pod.UID) if waitOnPermitStatus.Message() != tt.wantMessage {
}() t.Fatalf("Expected WaitOnPermit to return status with message %q, but got %q",
permitStatus := f.RunPermitPlugins(context.Background(), nil, pod, "") tt.wantMessage, waitOnPermitStatus.Message())
if permitStatus.Message() != "pod \"pod\" rejected while waiting at permit: removed" { }
t.Fatalf("RejectWaitingPod failed, permitStatus: %v", permitStatus) })
} }
} }

View File

@ -468,12 +468,14 @@ type Framework interface {
// RunPermitPlugins runs the set of configured permit plugins. If any of these // RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue // plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the // running the remaining plugins and returns an error. Otherwise, if any of the
// plugins returns "Wait", then this function will block for the timeout period // plugins returns "Wait", then this function will create and add waiting pod
// returned by the plugin, if the time expires, then it will return an error. // to a map of currently waiting pods and return status with "Wait" code.
// Note that if multiple plugins asked to wait, then we wait for the minimum // Pod will remain waiting pod for the minimum duration returned by the permit plugins.
// timeout duration.
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
// RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose // RunBindPlugins runs the set of configured bind plugins. A bind plugin may choose
// whether or not to handle the given Pod. If a bind plugin chooses to skip the // whether or not to handle the given Pod. If a bind plugin chooses to skip the
// binding, it should return code=5("skip") status. Otherwise, it should return "Error" // binding, it should return code=5("skip") status. Otherwise, it should return "Error"
@ -497,9 +499,9 @@ type Framework interface {
type FrameworkHandle interface { type FrameworkHandle interface {
// SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot // SnapshotSharedLister returns listers from the latest NodeInfo Snapshot. The snapshot
// is taken at the beginning of a scheduling cycle and remains unchanged until // is taken at the beginning of a scheduling cycle and remains unchanged until
// a pod finishes "Reserve" point. There is no guarantee that the information // a pod finishes "Permit" point. There is no guarantee that the information
// remains unchanged in the binding phase of scheduling, so plugins in the binding // remains unchanged in the binding phase of scheduling, so plugins in the binding
// cycle(permit/pre-bind/bind/post-bind/un-reserve plugin) should not use it, // cycle (pre-bind/bind/post-bind/un-reserve plugin) should not use it,
// otherwise a concurrent read/write error might occur, they should use scheduler // otherwise a concurrent read/write error might occur, they should use scheduler
// cache instead. // cache instead.
SnapshotSharedLister() schedulerlisters.SharedLister SnapshotSharedLister() schedulerlisters.SharedLister

View File

@ -27,19 +27,19 @@ import (
// waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase. // waitingPodsMap a thread-safe map used to maintain pods waiting in the permit phase.
type waitingPodsMap struct { type waitingPodsMap struct {
pods map[types.UID]WaitingPod pods map[types.UID]*waitingPod
mu sync.RWMutex mu sync.RWMutex
} }
// newWaitingPodsMap returns a new waitingPodsMap. // newWaitingPodsMap returns a new waitingPodsMap.
func newWaitingPodsMap() *waitingPodsMap { func newWaitingPodsMap() *waitingPodsMap {
return &waitingPodsMap{ return &waitingPodsMap{
pods: make(map[types.UID]WaitingPod), pods: make(map[types.UID]*waitingPod),
} }
} }
// add a new WaitingPod to the map. // add a new WaitingPod to the map.
func (m *waitingPodsMap) add(wp WaitingPod) { func (m *waitingPodsMap) add(wp *waitingPod) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
m.pods[wp.GetPod().UID] = wp m.pods[wp.GetPod().UID] = wp
@ -53,11 +53,10 @@ func (m *waitingPodsMap) remove(uid types.UID) {
} }
// get a WaitingPod from the map. // get a WaitingPod from the map.
func (m *waitingPodsMap) get(uid types.UID) WaitingPod { func (m *waitingPodsMap) get(uid types.UID) *waitingPod {
m.mu.RLock() m.mu.RLock()
defer m.mu.RUnlock() defer m.mu.RUnlock()
return m.pods[uid] return m.pods[uid]
} }
// iterate acquires a read lock and iterates over the WaitingPods map. // iterate acquires a read lock and iterates over the WaitingPods map.
@ -77,11 +76,17 @@ type waitingPod struct {
mu sync.RWMutex mu sync.RWMutex
} }
var _ WaitingPod = &waitingPod{}
// newWaitingPod returns a new waitingPod instance. // newWaitingPod returns a new waitingPod instance.
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod { func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
wp := &waitingPod{ wp := &waitingPod{
pod: pod, pod: pod,
s: make(chan *Status), // Allow() and Reject() calls are non-blocking. This property is guaranteed
// by using non-blocking send to this channel. This channel has a buffer of size 1
// to ensure that non-blocking send will not be ignored - possible situation when
// receiving from this channel happens after non-blocking send.
s: make(chan *Status, 1),
} }
wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime)) wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))

View File

@ -218,7 +218,7 @@ var (
&metrics.HistogramOpts{ &metrics.HistogramOpts{
Subsystem: SchedulerSubsystem, Subsystem: SchedulerSubsystem,
Name: "permit_wait_duration_seconds", Name: "permit_wait_duration_seconds",
Help: "Duration of waiting in RunPermitPlugins.", Help: "Duration of waiting on permit.",
Buckets: metrics.ExponentialBuckets(0.001, 2, 15), Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
StabilityLevel: metrics.ALPHA, StabilityLevel: metrics.ALPHA,
}, },

View File

@ -639,6 +639,27 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
return return
} }
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
var reason string
if runPermitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
metrics.PodScheduleErrors.Inc()
reason = SchedulerError
}
if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
}
// One of the plugins returned status different than success or wait.
fwk.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above). // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func() { go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx) bindingCycleCtx, cancel := context.WithCancel(ctx)
@ -646,11 +667,10 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
metrics.SchedulerGoroutines.WithLabelValues("binding").Inc() metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec() defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
// Run "permit" plugins. waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
permitStatus := fwk.RunPermitPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !waitOnPermitStatus.IsSuccess() {
if !permitStatus.IsSuccess() {
var reason string var reason string
if permitStatus.IsUnschedulable() { if waitOnPermitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc() metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable reason = v1.PodReasonUnschedulable
} else { } else {
@ -662,7 +682,7 @@ func (sched *Scheduler) scheduleOne(ctx context.Context) {
} }
// trigger un-reserve plugins to clean up state associated with the reserved Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) fwk.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
sched.recordSchedulingFailure(assumedPodInfo, permitStatus.AsError(), reason, permitStatus.Message()) sched.recordSchedulingFailure(assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
return return
} }