diff --git a/controller/generic_controller.go b/controller/generic_controller.go index c80e5353..83513099 100644 --- a/controller/generic_controller.go +++ b/controller/generic_controller.go @@ -77,6 +77,7 @@ type genericController struct { generation int informer cache.SharedIndexInformer handlers []*handlerDef + preStart []string queue workqueue.RateLimitingInterface name string running bool @@ -91,15 +92,8 @@ func NewGenericController(name string, genericClient Backend) GenericController }, genericClient.ObjectFactory().Object(), resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - rl := workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second), - // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ) - return &genericController{ informer: informer, - queue: workqueue.NewNamedRateLimitingQueue(rl, name), name: name, } } @@ -117,10 +111,14 @@ func (g *genericController) Informer() cache.SharedIndexInformer { } func (g *genericController) Enqueue(namespace, name string) { - if namespace == "" { - g.queue.Add(name) + key := name + if namespace != "" { + key = namespace + "/" + name + } + if g.queue == nil { + g.preStart = append(g.preStart, key) } else { - g.queue.Add(namespace + "/" + name) + g.queue.AddRateLimited(key) } } @@ -155,11 +153,31 @@ func (g *genericController) Sync(ctx context.Context) error { return g.sync(ctx) } -func (g *genericController) sync(ctx context.Context) error { +func (g *genericController) sync(ctx context.Context) (retErr error) { if g.synced { return nil } + if g.queue == nil { + rl := workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ) + + g.queue = workqueue.NewNamedRateLimitingQueue(rl, g.name) + for _, key := range g.preStart { + g.queue.Add(key) + } + g.preStart = nil + + defer func() { + if retErr != nil { + g.queue.ShutDown() + } + }() + } + defer utilruntime.HandleCrash() g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{