wait for eventhandlers to sync before run scheduler

This commit is contained in:
kidddddddddddddddddddddd 2023-03-22 10:54:28 +08:00 committed by AxeZhan
parent 6dbb1c6cf0
commit 9c7166ff63
6 changed files with 123 additions and 31 deletions

View File

@ -208,11 +208,17 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
if cc.DynInformerFactory != nil { if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done()) cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
} }
// Wait for all handlers to sync (all items in the initial list delivered) before scheduling.
if err := sched.WaitForHandlersSync(ctx); err != nil {
logger.Error(err, "waiting for handlers to sync")
}
logger.V(3).Info("Handlers synced")
} }
if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil { if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
startInformersAndWaitForSync(ctx) startInformersAndWaitForSync(ctx)
} }
// If leader election is enabled, runCommand via LeaderElector until done and exit. // If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil { if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{

View File

@ -17,14 +17,17 @@ limitations under the License.
package scheduler package scheduler
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"strings" "strings"
"time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1" storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -256,6 +259,24 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
return profiles.HandlesSchedulerName(pod.Spec.SchedulerName) return profiles.HandlesSchedulerName(pod.Spec.SchedulerName)
} }
const (
// syncedPollPeriod controls how often you look at the status of your sync funcs
syncedPollPeriod = 100 * time.Millisecond
)
// WaitForHandlersSync waits for EventHandlers to sync.
// It returns true if it was successful, false if the controller should shut down
func (sched *Scheduler) WaitForHandlersSync(ctx context.Context) error {
return wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (done bool, err error) {
for _, handler := range sched.registeredHandlers {
if !handler.HasSynced() {
return false, nil
}
}
return true, nil
})
}
// addAllEventHandlers is a helper function used in tests and in Scheduler // addAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers. // to add event handlers for various informers.
func addAllEventHandlers( func addAllEventHandlers(
@ -263,9 +284,14 @@ func addAllEventHandlers(
informerFactory informers.SharedInformerFactory, informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
gvkMap map[framework.GVK]framework.ActionType, gvkMap map[framework.GVK]framework.ActionType,
) { ) error {
var (
handlerRegistration cache.ResourceEventHandlerRegistration
err error
handlers []cache.ResourceEventHandlerRegistration
)
// scheduled pod cache // scheduled pod cache
informerFactory.Core().V1().Pods().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{ cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool { FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) { switch t := obj.(type) {
@ -290,9 +316,13 @@ func addAllEventHandlers(
DeleteFunc: sched.deletePodFromCache, DeleteFunc: sched.deletePodFromCache,
}, },
}, },
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
// unscheduled pod queue // unscheduled pod queue
informerFactory.Core().V1().Pods().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
cache.FilteringResourceEventHandler{ cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool { FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) { switch t := obj.(type) {
@ -317,15 +347,21 @@ func addAllEventHandlers(
DeleteFunc: sched.deletePodFromSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue,
}, },
}, },
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
informerFactory.Core().V1().Nodes().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: sched.addNodeToCache, AddFunc: sched.addNodeToCache,
UpdateFunc: sched.updateNodeInCache, UpdateFunc: sched.updateNodeInCache,
DeleteFunc: sched.deleteNodeFromCache, DeleteFunc: sched.deleteNodeFromCache,
}, },
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
logger := sched.logger logger := sched.logger
buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs { buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs {
@ -356,17 +392,26 @@ func addAllEventHandlers(
case framework.Node, framework.Pod: case framework.Node, framework.Pod:
// Do nothing. // Do nothing.
case framework.CSINode: case framework.CSINode:
informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSINode, "CSINode"), buildEvtResHandler(at, framework.CSINode, "CSINode"),
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.CSIDriver: case framework.CSIDriver:
informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSIDriver, "CSIDriver"), buildEvtResHandler(at, framework.CSIDriver, "CSIDriver"),
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.CSIStorageCapacity: case framework.CSIStorageCapacity:
informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler(
buildEvtResHandler(at, framework.CSIStorageCapacity, "CSIStorageCapacity"), buildEvtResHandler(at, framework.CSIStorageCapacity, "CSIStorageCapacity"),
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.PersistentVolume: case framework.PersistentVolume:
// MaxPDVolumeCountPredicate: since it relies on the counts of PV. // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
// //
@ -381,42 +426,60 @@ func addAllEventHandlers(
// bindings due to conflicts if PVs are updated by PV controller or other // bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We // parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario. // need to move pods to active queue on PV update for this scenario.
informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolume, "Pv"), buildEvtResHandler(at, framework.PersistentVolume, "Pv"),
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.PersistentVolumeClaim: case framework.PersistentVolumeClaim:
// MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound. // MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"), buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
case framework.PodSchedulingContext: case framework.PodSchedulingContext:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
_, _ = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler(
buildEvtResHandler(at, framework.PodSchedulingContext, "PodSchedulingContext"), buildEvtResHandler(at, framework.PodSchedulingContext, "PodSchedulingContext"),
) ); err != nil {
return err
}
} }
handlers = append(handlers, handlerRegistration)
case framework.ResourceClaim: case framework.ResourceClaim:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
_, _ = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"), buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
) ); err != nil {
return err
}
} }
handlers = append(handlers, handlerRegistration)
case framework.StorageClass: case framework.StorageClass:
if at&framework.Add != 0 { if at&framework.Add != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: sched.onStorageClassAdd, AddFunc: sched.onStorageClassAdd,
}, },
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
} }
if at&framework.Update != 0 { if at&framework.Update != 0 {
informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler( if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, obj interface{}) { UpdateFunc: func(old, obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil) sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil)
}, },
}, },
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
} }
default: default:
// Tests may not instantiate dynInformerFactory. // Tests may not instantiate dynInformerFactory.
@ -438,11 +501,16 @@ func addAllEventHandlers(
// Fall back to try dynamic informers. // Fall back to try dynamic informers.
gvr, _ := schema.ParseResourceArg(string(gvk)) gvr, _ := schema.ParseResourceArg(string(gvk))
dynInformer := dynInformerFactory.ForResource(*gvr).Informer() dynInformer := dynInformerFactory.ForResource(*gvr).Informer()
dynInformer.AddEventHandler( if handlerRegistration, err = dynInformer.AddEventHandler(
buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)), buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)),
) ); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
} }
} }
sched.registeredHandlers = handlers
return nil
} }
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent { func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {

View File

@ -448,7 +448,9 @@ func TestAddAllEventHandlers(t *testing.T) {
dynclient := dyfake.NewSimpleDynamicClient(scheme) dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0) dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)
addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap) if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil {
t.Fatalf("Add event handlers failed, error = %v", err)
}
informerFactory.Start(testSched.StopEverything) informerFactory.Start(testSched.StopEverything)
dynInformerFactory.Start(testSched.StopEverything) dynInformerFactory.Start(testSched.StopEverything)

View File

@ -469,6 +469,9 @@ func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
// Run scheduler. // Run scheduler.
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done())
if err = sched.WaitForHandlersSync(ctx); err != nil {
t.Fatalf("Handlers failed to sync: %v: ", err)
}
go sched.Run(ctx) go sched.Run(ctx)
// Send pods to be scheduled. // Send pods to be scheduled.

View File

@ -100,6 +100,9 @@ type Scheduler struct {
// otherwise logging functions will access a nil sink and // otherwise logging functions will access a nil sink and
// panic. // panic.
logger klog.Logger logger klog.Logger
// registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start.
registeredHandlers []cache.ResourceEventHandlerRegistration
} }
func (sched *Scheduler) applyDefaultHandlers() { func (sched *Scheduler) applyDefaultHandlers() {
@ -349,7 +352,9 @@ func New(ctx context.Context,
} }
sched.applyDefaultHandlers() sched.applyDefaultHandlers()
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)) if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}
return sched, nil return sched, nil
} }

View File

@ -80,6 +80,8 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf
evtBroadcaster.StartRecordingToSink(ctx.Done()) evtBroadcaster.StartRecordingToSink(ctx.Done())
logger := klog.FromContext(ctx)
sched, err := scheduler.New( sched, err := scheduler.New(
ctx, ctx,
clientSet, clientSet,
@ -94,11 +96,17 @@ func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConf
scheduler.WithExtenders(cfg.Extenders...), scheduler.WithExtenders(cfg.Extenders...),
scheduler.WithParallelism(cfg.Parallelism)) scheduler.WithParallelism(cfg.Parallelism))
if err != nil { if err != nil {
klog.Fatalf("Error creating scheduler: %v", err) logger.Error(err, "Error creating scheduler")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
} }
informerFactory.Start(ctx.Done()) informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done())
if err = sched.WaitForHandlersSync(ctx); err != nil {
logger.Error(err, "Failed waiting for handlers to sync")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
logger.V(3).Info("Handlers synced")
go sched.Run(ctx) go sched.Run(ctx)
return sched, informerFactory return sched, informerFactory