mirror of
https://github.com/rancher/norman.git
synced 2025-09-01 15:18:20 +00:00
Fix goroutine leak on failed sync() or before
A leak would occur if either a GenericController was created and Sync() or Start() was never called or if Sync() failed.
This commit is contained in:
committed by
Craig Jellick
parent
2115b95249
commit
3afadb987c
@@ -77,6 +77,7 @@ type genericController struct {
|
||||
generation int
|
||||
informer cache.SharedIndexInformer
|
||||
handlers []*handlerDef
|
||||
preStart []string
|
||||
queue workqueue.RateLimitingInterface
|
||||
name string
|
||||
running bool
|
||||
@@ -91,15 +92,8 @@ func NewGenericController(name string, genericClient Backend) GenericController
|
||||
},
|
||||
genericClient.ObjectFactory().Object(), resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
|
||||
rl := workqueue.NewMaxOfRateLimiter(
|
||||
workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second),
|
||||
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
|
||||
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
||||
)
|
||||
|
||||
return &genericController{
|
||||
informer: informer,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(rl, name),
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
@@ -117,10 +111,14 @@ func (g *genericController) Informer() cache.SharedIndexInformer {
|
||||
}
|
||||
|
||||
func (g *genericController) Enqueue(namespace, name string) {
|
||||
if namespace == "" {
|
||||
g.queue.Add(name)
|
||||
key := name
|
||||
if namespace != "" {
|
||||
key = namespace + "/" + name
|
||||
}
|
||||
if g.queue == nil {
|
||||
g.preStart = append(g.preStart, key)
|
||||
} else {
|
||||
g.queue.Add(namespace + "/" + name)
|
||||
g.queue.AddRateLimited(key)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,11 +153,31 @@ func (g *genericController) Sync(ctx context.Context) error {
|
||||
return g.sync(ctx)
|
||||
}
|
||||
|
||||
func (g *genericController) sync(ctx context.Context) error {
|
||||
func (g *genericController) sync(ctx context.Context) (retErr error) {
|
||||
if g.synced {
|
||||
return nil
|
||||
}
|
||||
|
||||
if g.queue == nil {
|
||||
rl := workqueue.NewMaxOfRateLimiter(
|
||||
workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second),
|
||||
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
|
||||
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
|
||||
)
|
||||
|
||||
g.queue = workqueue.NewNamedRateLimitingQueue(rl, g.name)
|
||||
for _, key := range g.preStart {
|
||||
g.queue.Add(key)
|
||||
}
|
||||
g.preStart = nil
|
||||
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
g.queue.ShutDown()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
|
Reference in New Issue
Block a user