Prevent flutter of CRD APIServices on start

This commit is contained in:
Jordan Liggitt 2017-09-02 13:21:26 -04:00
parent d353adc467
commit 0529dd405b
No known key found for this signature in database
GPG Key ID: 39928704103C7229
2 changed files with 27 additions and 2 deletions

View File

@ -105,8 +105,13 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
autoRegistrationController)
aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go autoRegistrationController.Run(5, context.StopCh)
go crdRegistrationController.Run(5, context.StopCh)
go func() {
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
crdRegistrationController.WaitForInitialSync()
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})

View File

@ -53,6 +53,8 @@ type crdRegistrationController struct {
syncHandler func(groupVersion schema.GroupVersion) error
syncedInitialSet chan struct{}
// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
// this is actually keyed by a groupVersion
queue workqueue.RateLimitingInterface
@ -67,7 +69,8 @@ func NewAutoRegistrationController(crdinformer crdinformers.CustomResourceDefini
crdLister: crdinformer.Lister(),
crdSynced: crdinformer.Informer().HasSynced,
apiServiceRegistration: apiServiceRegistration,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd-autoregister"),
syncedInitialSet: make(chan struct{}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd-autoregister"),
}
c.syncHandler = c.handleVersionUpdate
@ -114,6 +117,18 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{})
return
}
// process each item in the list once
if crds, err := c.crdLister.List(labels.Everything()); err != nil {
utilruntime.HandleError(err)
} else {
for _, crd := range crds {
if err := c.syncHandler(schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Version}); err != nil {
utilruntime.HandleError(err)
}
}
}
close(c.syncedInitialSet)
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
@ -125,6 +140,11 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{})
<-stopCh
}
// WaitForInitialSync blocks until the initial set of CRD resources has been processed
func (c *crdRegistrationController) WaitForInitialSync() {
<-c.syncedInitialSet
}
func (c *crdRegistrationController) runWorker() {
// hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
// available, so we don't worry about secondary waits