Store a cluster event to plugin map in SchedulerQueue

This commit is contained in:
Wei Huang
2021-01-28 22:29:10 -08:00
parent 6404eda8de
commit f322019d7a
9 changed files with 546 additions and 53 deletions

View File

@@ -34,6 +34,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/scheduler/framework"
@@ -142,6 +143,8 @@ type PriorityQueue struct {
// when we received move request.
moveRequestCycle int64
clusterEventMap map[framework.ClusterEvent]sets.String
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
@@ -152,6 +155,7 @@ type priorityQueueOptions struct {
podInitialBackoffDuration time.Duration
podMaxBackoffDuration time.Duration
podNominator framework.PodNominator
clusterEventMap map[framework.ClusterEvent]sets.String
}
// Option configures a PriorityQueue
@@ -185,6 +189,13 @@ func WithPodNominator(pn framework.PodNominator) Option {
}
}
// WithClusterEventMap sets clusterEventMap for PriorityQueue.
func WithClusterEventMap(m map[framework.ClusterEvent]sets.String) Option {
return func(o *priorityQueueOptions) {
o.clusterEventMap = m
}
}
var defaultPriorityQueueOptions = priorityQueueOptions{
clock: util.RealClock{},
podInitialBackoffDuration: DefaultPodInitialBackoffDuration,
@@ -195,11 +206,12 @@ var defaultPriorityQueueOptions = priorityQueueOptions{
var _ SchedulingQueue = &PriorityQueue{}
// newQueuedPodInfoForLookup builds a QueuedPodInfo object for a lookup in the queue.
func newQueuedPodInfoForLookup(pod *v1.Pod) *framework.QueuedPodInfo {
func newQueuedPodInfoForLookup(pod *v1.Pod, plugins ...string) *framework.QueuedPodInfo {
// Since this is only used for a lookup in the queue, we only need to set the Pod,
// and so we avoid creating a full PodInfo, which is expensive to instantiate frequently.
return &framework.QueuedPodInfo{
PodInfo: &framework.PodInfo{Pod: pod},
PodInfo: &framework.PodInfo{Pod: pod},
UnschedulablePlugins: sets.NewString(plugins...),
}
}
@@ -232,6 +244,7 @@ func NewPriorityQueue(
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
unschedulableQ: newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
moveRequestCycle: -1,
clusterEventMap: options.clusterEventMap,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
@@ -509,7 +522,16 @@ func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) {
moved := false
for _, pInfo := range podInfoList {
// If the event doesn't help making the Pod schedulable, continue.
// Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes
// either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
// In that case, it's desired to move it anyways.
if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
continue
}
moved = true
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
@@ -528,7 +550,9 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
}
}
p.moveRequestCycle = p.schedulingCycle
p.cond.Broadcast()
if moved {
p.cond.Broadcast()
}
}
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
@@ -625,12 +649,13 @@ func (p *PriorityQueue) NumUnschedulablePods() int {
}
// newQueuedPodInfo builds a QueuedPodInfo object.
func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod) *framework.QueuedPodInfo {
func (p *PriorityQueue) newQueuedPodInfo(pod *v1.Pod, plugins ...string) *framework.QueuedPodInfo {
now := p.clock.Now()
return &framework.QueuedPodInfo{
PodInfo: framework.NewPodInfo(pod),
Timestamp: now,
InitialAttemptTimestamp: now,
UnschedulablePlugins: sets.NewString(plugins...),
}
}
@@ -820,3 +845,46 @@ func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
func podInfoKeyFunc(obj interface{}) (string, error) {
return cache.MetaNamespaceKeyFunc(obj.(*framework.QueuedPodInfo).Pod)
}
// Checks if the Pod may become schedulable upon the event.
// This is achieved by looking up the global clusterEventMap registry.
func (p *PriorityQueue) podMatchesEvent(podInfo *framework.QueuedPodInfo, event string) bool {
clusterEvent, ok := clusterEventReg[event]
if !ok {
return false
}
if clusterEvent == framework.WildCardEvent {
return true
}
for evt, nameSet := range p.clusterEventMap {
// Firstly verify if the two ClusterEvents match:
// - either the registered event from plugin side is a WildCardEvent,
// - or the two events have identical Resource fields and *compatible* ActionType.
// Note the ActionTypes don't need to be *identical*. We check if the ANDed value
// is zero or not. In this way, it's easy to tell Update&Delete is not compatible,
// but Update&All is.
evtMatch := evt == framework.WildCardEvent ||
(evt.Resource == clusterEvent.Resource && evt.ActionType&clusterEvent.ActionType != 0)
// Secondly verify the plugin name matches.
// Note that if it doesn't match, we shouldn't continue to search.
if evtMatch && intersect(nameSet, podInfo.UnschedulablePlugins) {
return true
}
}
return false
}
func intersect(x, y sets.String) bool {
if len(x) > len(y) {
x, y = y, x
}
for v := range x {
if y.Has(v) {
return true
}
}
return false
}