Scheduler now registers event handlers dynamically

- move clusterEventMap to Configurator
- dynamic event handlers registration for core API resources
- dynamic event handlers registration for custom resources
This commit is contained in:
Wei Huang
2021-05-21 13:47:06 -07:00
parent fb3273774a
commit 1b3a124ba6
6 changed files with 314 additions and 89 deletions

View File

@@ -19,7 +19,10 @@ package scheduler
import (
"fmt"
"reflect"
"strings"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
@@ -39,32 +42,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler/profile"
)
func (sched *Scheduler) onPvAdd(obj interface{}) {
// Pods created when there are no PVs available will be stuck in
// unschedulable queue. But unbound PVs created for static provisioning and
// delay binding storage class are skipped in PV controller dynamic
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd, nil)
}
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
// Scheduler.bindVolumesWorker may fail to update assumed pod volume
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate, nil)
}
func (sched *Scheduler) onPvcAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd, nil)
}
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate, nil)
}
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
sc, ok := obj.(*storagev1.StorageClass)
if !ok {
@@ -83,18 +60,6 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
}
}
func (sched *Scheduler) onServiceAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd, nil)
}
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate, nil)
}
func (sched *Scheduler) onServiceDelete(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete, nil)
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
@@ -153,14 +118,6 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
}
}
func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd, nil)
}
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate, nil)
}
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
pod := obj.(*v1.Pod)
klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
@@ -312,6 +269,8 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
gvkMap map[framework.GVK]framework.ActionType,
) {
// scheduled pod cache
informerFactory.Core().V1().Pods().Informer().AddEventHandler(
@@ -372,46 +331,100 @@ func addAllEventHandlers(
},
)
informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onCSINodeAdd,
UpdateFunc: sched.onCSINodeUpdate,
},
)
buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs {
funcs := cache.ResourceEventHandlerFuncs{}
if at&framework.Add != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)}
funcs.AddFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
if at&framework.Update != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)}
funcs.UpdateFunc = func(_, _ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
if at&framework.Delete != 0 {
evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)}
funcs.DeleteFunc = func(_ interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(evt, nil)
}
}
return funcs
}
// On add and update of PVs.
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
for gvk, at := range gvkMap {
switch gvk {
case framework.Node, framework.Pod:
// Do nothing.
case framework.CSINode:
informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSINode, "CSINode"),
)
case framework.PersistentVolume:
// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
AddFunc: sched.onPvAdd,
UpdateFunc: sched.onPvUpdate,
},
)
// This is for MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onPvcAdd,
UpdateFunc: sched.onPvcUpdate,
},
)
// This is for ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
informerFactory.Core().V1().Services().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onServiceAdd,
UpdateFunc: sched.onServiceUpdate,
DeleteFunc: sched.onServiceDelete,
},
)
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd,
},
)
//
// PvAdd: Pods created when there are no PVs available will be stuck in
// unschedulable queue. But unbound PVs created for static provisioning and
// delay binding storage class are skipped in PV controller dynamic
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
//
// PvUpdate: Scheduler.bindVolumesWorker may fail to update assumed pod volume
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolume, "Pv"),
)
case framework.PersistentVolumeClaim:
// MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
)
case framework.StorageClass:
if at&framework.Add != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd,
},
)
}
case framework.Service:
// ServiceAffinity: affected by the selector of the service is updated.
// Also, if new service is added, equivalence cache will also become invalid since
// existing pods may be "captured" by this service and change this predicate result.
informerFactory.Core().V1().Services().Informer().AddEventHandler(
buildEvtResHandler(at, framework.Service, "Service"),
)
default:
// Tests may not instantiate dynInformerFactory.
if dynInformerFactory == nil {
continue
}
// GVK is expected to be at least 3-folded, separated by dots.
// <kind in plural>.<version>.<group>
// Valid examples:
// - foos.v1.example.com
// - bars.v1beta1.a.b.c
// Invalid examples:
// - foos.v1 (2 sections)
// - foo.v1.example.com (the first section should be plural)
if strings.Count(string(gvk), ".") < 2 {
klog.ErrorS(nil, "incorrect event registration", "gvk", gvk)
continue
}
// Fall back to try dynamic informers.
gvr, _ := schema.ParseResourceArg(string(gvk))
dynInformer := dynInformerFactory.ForResource(*gvr).Informer()
dynInformer.AddEventHandler(
buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)),
)
go dynInformer.Run(sched.StopEverything)
}
}
}
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {