1
0
mirror of https://github.com/rancher/norman.git synced 2025-09-01 15:18:20 +00:00
This commit is contained in:
Darren Shepherd
2017-11-28 14:28:25 -07:00
parent faa1fb2148
commit 389d27b3e5
28 changed files with 490 additions and 159 deletions

View File

@@ -24,7 +24,9 @@ type HandlerFunc func(key string) error
type GenericController interface {
Informer() cache.SharedIndexInformer
AddHandler(handler HandlerFunc)
HandlerCount() int
Enqueue(namespace, name string)
Sync(ctx context.Context) error
Start(ctx context.Context, threadiness int) error
}
@@ -35,6 +37,7 @@ type genericController struct {
queue workqueue.RateLimitingInterface
name string
running bool
synced bool
}
func NewGenericController(name string, objectClient *clientbase.ObjectClient) GenericController {
@@ -53,6 +56,10 @@ func NewGenericController(name string, objectClient *clientbase.ObjectClient) Ge
}
}
func (g *genericController) HandlerCount() int {
return len(g.handlers)
}
func (g *genericController) Informer() cache.SharedIndexInformer {
return g.informer
}
@@ -69,10 +76,50 @@ func (g *genericController) AddHandler(handler HandlerFunc) {
g.handlers = append(g.handlers, handler)
}
func (g *genericController) Sync(ctx context.Context) error {
g.Lock()
defer g.Unlock()
return g.sync(ctx)
}
func (g *genericController) sync(ctx context.Context) error {
if g.synced {
return nil
}
defer utilruntime.HandleCrash()
g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: g.queueObject,
UpdateFunc: func(_, obj interface{}) {
g.queueObject(obj)
},
DeleteFunc: g.queueObject,
})
logrus.Infof("Starting %s Controller", g.name)
go g.informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) {
return fmt.Errorf("failed to sync controller %s", g.name)
}
g.synced = true
return nil
}
func (g *genericController) Start(ctx context.Context, threadiness int) error {
g.Lock()
defer g.Unlock()
if !g.synced {
if err := g.sync(ctx); err != nil {
return err
}
}
if !g.running {
go g.run(ctx, threadiness)
}
@@ -92,22 +139,6 @@ func (g *genericController) run(ctx context.Context, threadiness int) {
defer utilruntime.HandleCrash()
defer g.queue.ShutDown()
g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: g.queueObject,
UpdateFunc: func(_, obj interface{}) {
g.queueObject(obj)
},
DeleteFunc: g.queueObject,
})
logrus.Infof("Starting %s Controller", g.name)
go g.informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) {
return
}
for i := 0; i < threadiness; i++ {
go wait.Until(g.runWorker, time.Second, ctx.Done())
}