PreEnqueue implementation

- Add PreEnqueuePlugin to Scheduler Framework
- Implement PreEnqueuePlugin in scheduler queue
- Implementation of SchedulingGates plugin
- Metrics
This commit is contained in:
Wei Huang
2022-11-07 14:02:22 -08:00
parent 2de75d92bf
commit 0b27f25252
17 changed files with 530 additions and 70 deletions

View File

@@ -27,6 +27,7 @@ limitations under the License.
package queue
import (
"context"
"fmt"
"reflect"
"sync"
@@ -63,6 +64,8 @@ const (
activeQName = "Active"
backoffQName = "Backoff"
unschedulablePods = "Unschedulable"
preEnqueue = "PreEnqueue"
)
const (
@@ -172,6 +175,8 @@ type PriorityQueue struct {
moveRequestCycle int64
clusterEventMap map[framework.ClusterEvent]sets.String
// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
@@ -187,6 +192,7 @@ type priorityQueueOptions struct {
podMaxInUnschedulablePodsDuration time.Duration
podNominator framework.PodNominator
clusterEventMap map[framework.ClusterEvent]sets.String
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
}
// Option configures a PriorityQueue
@@ -234,6 +240,13 @@ func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
}
}
// WithPreEnqueuePluginMap sets preEnqueuePluginMap for PriorityQueue.
func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option {
return func(o *priorityQueueOptions) {
o.preEnqueuePluginMap = m
}
}
var defaultPriorityQueueOptions = priorityQueueOptions{
clock: clock.RealClock{},
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
@@ -283,9 +296,10 @@ func NewPriorityQueue(
podMaxBackoffDuration: options.podMaxBackoffDuration,
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
preEnqueuePluginMap: options.preEnqueuePluginMap,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
@@ -300,19 +314,66 @@ func (p *PriorityQueue) Run() {
go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}
// runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin.
// It returns true if all PreEnqueue function run successfully; otherwise returns false
// upon the first failure.
// Note: we need to associate the failed plugin to `pInfo`, so that the pod can be moved back
// to activeQ by related cluster event.
func (p *PriorityQueue) runPreEnqueuePlugins(ctx context.Context, pInfo *framework.QueuedPodInfo) bool {
var s *framework.Status
pod := pInfo.Pod
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preEnqueue, s.Code().String(), pod.Spec.SchedulerName).Observe(metrics.SinceInSeconds(startTime))
}()
for _, pl := range p.preEnqueuePluginMap[pod.Spec.SchedulerName] {
s = pl.PreEnqueue(ctx, pod)
if s.IsSuccess() {
continue
}
pInfo.UnschedulablePlugins.Insert(pl.Name())
metrics.UnschedulableReason(pl.Name(), pod.Spec.SchedulerName).Inc()
if s.Code() == framework.Error {
klog.ErrorS(s.AsError(), "Unexpected error running PreEnqueue plugin", "pod", klog.KObj(pod), "plugin", pl.Name())
} else {
klog.V(5).InfoS("Status after running PreEnqueue plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", s)
}
return false
}
return true
}
// addToActiveQ tries to add pod to active queue. It returns 2 parameters:
// 1. a boolean flag to indicate whether the pod is added successfully.
// 2. an error for the caller to act on.
func (p *PriorityQueue) addToActiveQ(pInfo *framework.QueuedPodInfo) (bool, error) {
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
p.unschedulablePods.addOrUpdate(pInfo)
return false, nil
}
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
return false, err
}
return true, nil
}
// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
pInfo := p.newQueuedPodInfo(pod)
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod))
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
if p.unschedulablePods.get(pod) != nil {
klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
p.unschedulablePods.delete(pod)
p.unschedulablePods.delete(pInfo)
}
// Delete pod from backoffQ if it is backing off
if err := p.podBackoffQ.Delete(pInfo); err == nil {
@@ -367,11 +428,10 @@ func (p *PriorityQueue) activate(pod *v1.Pod) bool {
return false
}
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
if added, _ := p.addToActiveQ(pInfo); !added {
return false
}
p.unschedulablePods.delete(pod)
p.unschedulablePods.delete(pInfo)
p.podBackoffQ.Delete(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", ForceActivate).Inc()
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
@@ -446,8 +506,9 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
if rawPodInfo == nil {
break
}
pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
if p.isPodBackingoff(rawPodInfo.(*framework.QueuedPodInfo)) {
pInfo := rawPodInfo.(*framework.QueuedPodInfo)
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
break
}
_, err := p.podBackoffQ.Pop()
@@ -455,10 +516,11 @@ func (p *PriorityQueue) flushBackoffQCompleted() {
klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
break
}
p.activeQ.Add(rawPodInfo)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
activated = true
if added, _ := p.addToActiveQ(pInfo); added {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
activated = true
}
}
if activated {
@@ -560,13 +622,13 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
if err := p.podBackoffQ.Add(pInfo); err != nil {
return err
}
p.unschedulablePods.delete(usPodInfo.Pod)
p.unschedulablePods.delete(usPodInfo)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", PodUpdate, "queue", backoffQName)
} else {
if err := p.activeQ.Add(pInfo); err != nil {
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
p.unschedulablePods.delete(usPodInfo.Pod)
p.unschedulablePods.delete(usPodInfo)
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", BackoffComplete, "queue", activeQName)
p.cond.Broadcast()
}
@@ -579,7 +641,7 @@ func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
}
// If pod is not in any of the queues, we put it in the active queue.
pInfo := p.newQueuedPodInfo(newPod)
if err := p.activeQ.Add(pInfo); err != nil {
if added, err := p.addToActiveQ(pInfo); !added {
return err
}
p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil)
@@ -594,10 +656,11 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
p.lock.Lock()
defer p.lock.Unlock()
p.PodNominator.DeleteNominatedPodIfExists(pod)
if err := p.activeQ.Delete(newQueuedPodInfoForLookup(pod)); err != nil {
pInfo := newQueuedPodInfoForLookup(pod)
if err := p.activeQ.Delete(pInfo); err != nil {
// The item was probably not found in the activeQ.
p.podBackoffQ.Delete(newQueuedPodInfoForLookup(pod))
p.unschedulablePods.delete(pod)
p.podBackoffQ.Delete(pInfo)
p.unschedulablePods.delete(pInfo)
}
return nil
}
@@ -652,16 +715,14 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
} else {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
p.unschedulablePods.delete(pod)
p.unschedulablePods.delete(pInfo)
}
} else {
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
} else {
if added, _ := p.addToActiveQ(pInfo); added {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName)
activated = true
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
p.unschedulablePods.delete(pod)
p.unschedulablePods.delete(pInfo)
}
}
}
@@ -806,25 +867,33 @@ type UnschedulablePods struct {
// podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo.
podInfoMap map[string]*framework.QueuedPodInfo
keyFunc func(*v1.Pod) string
// metricRecorder updates the counter when elements of an unschedulablePodsMap
// get added or removed, and it does nothing if it's nil
metricRecorder metrics.MetricRecorder
// unschedulableRecorder/gatedRecorder updates the counter when elements of an unschedulablePodsMap
// get added or removed, and it does nothing if it's nil.
unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}
// Add adds a pod to the unschedulable podInfoMap.
func (u *UnschedulablePods) addOrUpdate(pInfo *framework.QueuedPodInfo) {
podID := u.keyFunc(pInfo.Pod)
if _, exists := u.podInfoMap[podID]; !exists && u.metricRecorder != nil {
u.metricRecorder.Inc()
if _, exists := u.podInfoMap[podID]; !exists {
if pInfo.Gated && u.gatedRecorder != nil {
u.gatedRecorder.Inc()
} else if !pInfo.Gated && u.unschedulableRecorder != nil {
u.unschedulableRecorder.Inc()
}
}
u.podInfoMap[podID] = pInfo
}
// Delete deletes a pod from the unschedulable podInfoMap.
func (u *UnschedulablePods) delete(pod *v1.Pod) {
podID := u.keyFunc(pod)
if _, exists := u.podInfoMap[podID]; exists && u.metricRecorder != nil {
u.metricRecorder.Dec()
func (u *UnschedulablePods) delete(pInfo *framework.QueuedPodInfo) {
podID := u.keyFunc(pInfo.Pod)
if _, exists := u.podInfoMap[podID]; exists {
if pInfo.Gated && u.gatedRecorder != nil {
u.gatedRecorder.Dec()
} else if !pInfo.Gated && u.unschedulableRecorder != nil {
u.unschedulableRecorder.Dec()
}
}
delete(u.podInfoMap, podID)
}
@@ -842,17 +911,21 @@ func (u *UnschedulablePods) get(pod *v1.Pod) *framework.QueuedPodInfo {
// Clear removes all the entries from the unschedulable podInfoMap.
func (u *UnschedulablePods) clear() {
u.podInfoMap = make(map[string]*framework.QueuedPodInfo)
if u.metricRecorder != nil {
u.metricRecorder.Clear()
if u.unschedulableRecorder != nil {
u.unschedulableRecorder.Clear()
}
if u.gatedRecorder != nil {
u.gatedRecorder.Clear()
}
}
// newUnschedulablePods initializes a new object of UnschedulablePods.
func newUnschedulablePods(metricRecorder metrics.MetricRecorder) *UnschedulablePods {
func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRecorder) *UnschedulablePods {
return &UnschedulablePods{
podInfoMap: make(map[string]*framework.QueuedPodInfo),
keyFunc: util.GetPodFullName,
metricRecorder: metricRecorder,
podInfoMap: make(map[string]*framework.QueuedPodInfo),
keyFunc: util.GetPodFullName,
unschedulableRecorder: unschedulableRecorder,
gatedRecorder: gatedRecorder,
}
}