Merge pull request #118551 from sanposhiho/event-to-register

feature(scheduler): implement ClusterEventWithHint to filter out useless events
This commit is contained in:
Kubernetes Prow Robot
2023-06-26 06:41:45 -07:00
committed by GitHub
29 changed files with 1281 additions and 511 deletions

View File

@@ -106,9 +106,13 @@ type SchedulingQueue interface {
Pop() (*framework.QueuedPodInfo, error)
Update(logger klog.Logger, oldPod, newPod *v1.Pod) error
Delete(pod *v1.Pod) error
MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck)
// TODO(sanposhiho): move all PreEnqueueCkeck to Requeue and delete it from this parameter eventually.
// Some PreEnqueueCheck include event filtering logic based on some in-tree plugins
// and it affect badly to other plugins.
// See https://github.com/kubernetes/kubernetes/issues/110175
MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck)
AssignedPodAdded(logger klog.Logger, pod *v1.Pod)
AssignedPodUpdated(logger klog.Logger, pod *v1.Pod)
AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod)
PendingPods() ([]*v1.Pod, string)
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
@@ -171,9 +175,10 @@ type PriorityQueue struct {
// when we received move request.
moveRequestCycle int64
clusterEventMap map[framework.ClusterEvent]sets.Set[string]
// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
// queueingHintMap is keyed with profile name, valued with registered queueing hint functions.
queueingHintMap QueueingHintMapPerProfile
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
@@ -186,6 +191,12 @@ type PriorityQueue struct {
pluginMetricsSamplePercent int
}
// QueueingHintFunction is the wrapper of QueueingHintFn that has PluginName.
type QueueingHintFunction struct {
PluginName string
QueueingHintFn framework.QueueingHintFn
}
type priorityQueueOptions struct {
clock clock.Clock
podInitialBackoffDuration time.Duration
@@ -194,8 +205,8 @@ type priorityQueueOptions struct {
podLister listersv1.PodLister
metricsRecorder metrics.MetricAsyncRecorder
pluginMetricsSamplePercent int
clusterEventMap map[framework.ClusterEvent]sets.Set[string]
preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin
queueingHintMap QueueingHintMapPerProfile
}
// Option configures a PriorityQueue
@@ -229,13 +240,6 @@ func WithPodLister(pl listersv1.PodLister) Option {
}
}
// WithClusterEventMap sets clusterEventMap for PriorityQueue.
func WithClusterEventMap(m map[framework.ClusterEvent]sets.Set[string]) Option {
return func(o *priorityQueueOptions) {
o.clusterEventMap = m
}
}
// WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue.
func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
return func(o *priorityQueueOptions) {
@@ -243,6 +247,19 @@ func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
}
}
// QueueingHintMapPerProfile is keyed with profile name, valued with queueing hint map registered for the profile.
type QueueingHintMapPerProfile map[string]QueueingHintMap
// QueueingHintMap is keyed with ClusterEvent, valued with queueing hint functions registered for the event.
type QueueingHintMap map[framework.ClusterEvent][]*QueueingHintFunction
// WithQueueingHintMapPerProfile sets preEnqueuePluginMap for PriorityQueue.
func WithQueueingHintMapPerProfile(m QueueingHintMapPerProfile) Option {
return func(o *priorityQueueOptions) {
o.queueingHintMap = m
}
}
// WithPreEnqueuePluginMap sets preEnqueuePluginMap for PriorityQueue.
func WithPreEnqueuePluginMap(m map[string][]framework.PreEnqueuePlugin) Option {
return func(o *priorityQueueOptions) {
@@ -314,8 +331,8 @@ func NewPriorityQueue(
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
preEnqueuePluginMap: options.preEnqueuePluginMap,
queueingHintMap: options.queueingHintMap,
metricsRecorder: options.metricsRecorder,
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
}
@@ -336,6 +353,62 @@ func (p *PriorityQueue) Run(logger klog.Logger) {
}, 30*time.Second, p.stop)
}
// isPodWorthRequeuing calls QueueingHintFn of only plugins registered in pInfo.unschedulablePlugins.
// If any QueueingHintFn returns QueueImmediately, the scheduling queue is supposed to enqueue this Pod to activeQ.
// If no QueueingHintFn returns QueueImmediately, but some return QueueAfterBackoff,
// the scheduling queue is supposed to enqueue this Pod to activeQ/backoffQ depending on the remaining backoff time of the Pod.
// If all QueueingHintFn returns QueueSkip, the scheduling queue enqueues the Pod back to unschedulable Pod pool
// because no plugin changes the scheduling result via the event.
func (p *PriorityQueue) isPodWorthRequeuing(logger klog.Logger, pInfo *framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) framework.QueueingHint {
if pInfo.UnschedulablePlugins.Len() == 0 {
logger.V(6).Info("Worth requeuing because no unschedulable plugins", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
}
if event.IsWildCard() {
logger.V(6).Info("Worth requeuing because the event is wildcard", "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
}
hintMap, ok := p.queueingHintMap[pInfo.Pod.Spec.SchedulerName]
if !ok {
// shouldn't reach here unless bug.
logger.Error(nil, "No QueueingHintMap is registered for this profile", "profile", pInfo.Pod.Spec.SchedulerName, "pod", klog.KObj(pInfo.Pod))
return framework.QueueAfterBackoff
}
pod := pInfo.Pod
queueHint := framework.QueueSkip
for eventToMatch, hintfns := range hintMap {
if eventToMatch.Resource != event.Resource || eventToMatch.ActionType&event.ActionType == 0 {
continue
}
for _, hintfn := range hintfns {
if !pInfo.UnschedulablePlugins.Has(hintfn.PluginName) {
continue
}
h := hintfn.QueueingHintFn(pod, oldObj, newObj)
if h == framework.QueueSkip {
continue
}
if h == framework.QueueImmediately {
return h
}
// replace queueHint with the returned value,
// but continue to other queueHintFn to check because other plugins may want to return QueueImmediately.
queueHint = h
}
}
// No queueing hint function is registered for this event
// or no queueing hint fn returns the value other than QueueSkip.
return queueHint
}
// runPreEnqueuePlugins iterates PreEnqueue function in each registered PreEnqueuePlugin.
// It returns true if all PreEnqueue function run successfully; otherwise returns false
// upon the first failure.
@@ -532,7 +605,6 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
p.unschedulablePods.addOrUpdate(pInfo)
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", unschedulablePods)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
}
p.addNominatedPodUnlocked(logger, pInfo.PodInfo, nil)
@@ -587,7 +659,7 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
}
if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout)
p.movePodsToActiveOrBackoffQueue(logger, podsToMove, UnschedulableTimeout, nil, nil)
}
}
@@ -716,7 +788,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd)
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd, nil, pod)
p.lock.Unlock()
}
@@ -736,12 +808,12 @@ func isPodResourcesResizedDown(pod *v1.Pod) bool {
// AssignedPodUpdated is called when a bound pod is updated. Change of labels
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, pod *v1.Pod) {
func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) {
p.lock.Lock()
if isPodResourcesResizedDown(pod) {
p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, nil)
if isPodResourcesResizedDown(newPod) {
p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, oldPod, newPod, nil)
} else {
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodUpdate)
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, newPod), AssignedPodUpdate, oldPod, newPod)
}
p.lock.Unlock()
}
@@ -751,54 +823,75 @@ func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, pod *v1.Pod) {
// This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives the signal after all the pods are in the
// queue and the head is the highest priority pod.
func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) {
func (p *PriorityQueue) moveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) {
unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulablePods.podInfoMap))
for _, pInfo := range p.unschedulablePods.podInfoMap {
if preCheck == nil || preCheck(pInfo.Pod) {
unschedulablePods = append(unschedulablePods, pInfo)
}
}
p.movePodsToActiveOrBackoffQueue(logger, unschedulablePods, event)
p.movePodsToActiveOrBackoffQueue(logger, unschedulablePods, event, oldObj, newObj)
}
// MoveAllToActiveOrBackoffQueue moves all pods from unschedulablePods to activeQ or backoffQ.
// This function adds all pods and then signals the condition variable to ensure that
// if Pop() is waiting for an item, it receives the signal after all the pods are in the
// queue and the head is the highest priority pod.
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, preCheck PreEnqueueCheck) {
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(logger klog.Logger, event framework.ClusterEvent, oldObj, newObj interface{}, preCheck PreEnqueueCheck) {
p.lock.Lock()
defer p.lock.Unlock()
p.moveAllToActiveOrBackoffQueue(logger, event, preCheck)
p.moveAllToActiveOrBackoffQueue(logger, event, oldObj, newObj, preCheck)
}
// requeuePodViaQueueingHint tries to requeue Pod to activeQ, backoffQ or unschedulable pod pool based on schedulingHint.
// It returns the queue name Pod goes.
//
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) requeuePodViaQueueingHint(logger klog.Logger, pInfo *framework.QueuedPodInfo, schedulingHint framework.QueueingHint, event string) string {
if schedulingHint == framework.QueueSkip {
p.unschedulablePods.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
return unschedulablePods
}
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) && schedulingHint == framework.QueueAfterBackoff {
if err := p.podBackoffQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the backoff queue, queue this Pod to unschedulable pod pool", "pod", klog.KObj(pod))
p.unschedulablePods.addOrUpdate(pInfo)
return unschedulablePods
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
return backoffQ
}
if added, _ := p.addToActiveQ(logger, pInfo); added {
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
return activeQ
}
p.unschedulablePods.addOrUpdate(pInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
return unschedulablePods
}
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent, oldObj, newObj interface{}) {
activated := false
for _, pInfo := range podInfoList {
// If the event doesn't help making the Pod schedulable, continue.
// Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes
// either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
// In that case, it's desired to move it anyways.
if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
schedulingHint := p.isPodWorthRequeuing(logger, pInfo, event, oldObj, newObj)
if schedulingHint == framework.QueueSkip {
// QueueingHintFn determined that this Pod isn't worth putting to activeQ or backoffQ by this event.
logger.V(5).Info("Event is not making pod schedulable", "pod", klog.KObj(pInfo.Pod), "event", event.Label)
continue
}
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
logger.Error(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
} else {
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQ)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
p.unschedulablePods.delete(pod, pInfo.Gated)
}
} else {
gated := pInfo.Gated
if added, _ := p.addToActiveQ(logger, pInfo); added {
logger.V(5).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQ)
activated = true
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
p.unschedulablePods.delete(pod, gated)
}
p.unschedulablePods.delete(pInfo.Pod, pInfo.Gated)
queue := p.requeuePodViaQueueingHint(logger, pInfo, schedulingHint, event.Label)
logger.V(4).Info("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event.Label, "queue", queue, "hint", schedulingHint)
if queue == activeQ {
activated = true
}
}
p.moveRequestCycle = p.schedulingCycle
@@ -1148,42 +1241,3 @@ func MakeNextPodFunc(logger klog.Logger, queue SchedulingQueue) func() *framewor
func podInfoKeyFunc(obj interface{}) (string, error) {
return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod)
}
// Checks if the Pod may become schedulable upon the event.
// This is achieved by looking up the global clusterEventMap registry.
func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, clusterEvent framework.ClusterEvent) bool {
if clusterEvent.IsWildCard() {
return true
}
for evt, nameSet := range p.clusterEventMap {
// Firstly verify if the two ClusterEvents match:
// - either the registered event from plugin side is a WildCardEvent,
// - or the two events have identical Resource fields and *compatible* ActionType.
// Note the ActionTypes don't need to be *identical*. We check if the ANDed value
// is zero or not. In this way, it's easy to tell Update&Delete is not compatible,
// but Update&All is.
evtMatch := evt.IsWildCard() ||
(evt.Resource == clusterEvent.Resource && evt.ActionType&clusterEvent.ActionType != 0)
// Secondly verify the plugin name matches.
// Note that if it doesn't match, we shouldn't continue to search.
if evtMatch && intersect(nameSet, podInfo.UnschedulablePlugins) {
return true
}
}
return false
}
func intersect(x, y sets.Set[string]) bool {
if len(x) > len(y) {
x, y = y, x
}
for v := range x {
if y.Has(v) {
return true
}
}
return false
}