1
0
mirror of https://github.com/rancher/norman.git synced 2025-05-30 18:55:06 +00:00
norman/controller/generic_controller.go

184 lines
3.8 KiB
Go
Raw Normal View History

2017-11-11 04:44:02 +00:00
package controller
import (
"context"
"fmt"
"sync"
"time"
"github.com/rancher/norman/clientbase"
"github.com/rancher/norman/types"
"github.com/sirupsen/logrus"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
var (
resyncPeriod = 5 * time.Minute
)
type HandlerFunc func(key string) error
type GenericController interface {
Informer() cache.SharedIndexInformer
AddHandler(handler HandlerFunc)
2017-11-28 21:28:25 +00:00
HandlerCount() int
2017-11-11 04:44:02 +00:00
Enqueue(namespace, name string)
2017-11-28 21:28:25 +00:00
Sync(ctx context.Context) error
2017-11-21 23:16:45 +00:00
Start(ctx context.Context, threadiness int) error
2017-11-11 04:44:02 +00:00
}
type genericController struct {
sync.Mutex
informer cache.SharedIndexInformer
handlers []HandlerFunc
queue workqueue.RateLimitingInterface
name string
running bool
2017-11-28 21:28:25 +00:00
synced bool
2017-11-11 04:44:02 +00:00
}
2017-11-13 19:50:25 +00:00
func NewGenericController(name string, objectClient *clientbase.ObjectClient) GenericController {
2017-11-11 04:44:02 +00:00
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: objectClient.List,
WatchFunc: objectClient.Watch,
},
objectClient.Factory.Object(), resyncPeriod, cache.Indexers{})
return &genericController{
informer: informer,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
name),
name: name,
2017-11-13 19:50:25 +00:00
}
2017-11-11 04:44:02 +00:00
}
2017-11-28 21:28:25 +00:00
func (g *genericController) HandlerCount() int {
return len(g.handlers)
}
2017-11-11 04:44:02 +00:00
func (g *genericController) Informer() cache.SharedIndexInformer {
return g.informer
}
func (g *genericController) Enqueue(namespace, name string) {
if namespace == "" {
g.queue.Add(name)
} else {
g.queue.Add(namespace + "/" + name)
}
}
func (g *genericController) AddHandler(handler HandlerFunc) {
g.handlers = append(g.handlers, handler)
}
2017-11-28 21:28:25 +00:00
func (g *genericController) Sync(ctx context.Context) error {
2017-11-11 04:44:02 +00:00
g.Lock()
defer g.Unlock()
2017-11-28 21:28:25 +00:00
return g.sync(ctx)
2017-11-11 04:44:02 +00:00
}
2017-11-28 21:28:25 +00:00
func (g *genericController) sync(ctx context.Context) error {
if g.synced {
return nil
2017-11-11 04:44:02 +00:00
}
defer utilruntime.HandleCrash()
g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: g.queueObject,
UpdateFunc: func(_, obj interface{}) {
g.queueObject(obj)
},
DeleteFunc: g.queueObject,
})
logrus.Infof("Starting %s Controller", g.name)
go g.informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) {
2017-11-28 21:28:25 +00:00
return fmt.Errorf("failed to sync controller %s", g.name)
2017-11-11 04:44:02 +00:00
}
2017-11-28 21:28:25 +00:00
g.synced = true
return nil
}
func (g *genericController) Start(ctx context.Context, threadiness int) error {
g.Lock()
defer g.Unlock()
if !g.synced {
if err := g.sync(ctx); err != nil {
return err
}
}
if !g.running {
go g.run(ctx, threadiness)
}
g.running = true
return nil
}
func (g *genericController) queueObject(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
g.queue.Add(key)
}
}
func (g *genericController) run(ctx context.Context, threadiness int) {
defer utilruntime.HandleCrash()
defer g.queue.ShutDown()
2017-11-11 04:44:02 +00:00
for i := 0; i < threadiness; i++ {
go wait.Until(g.runWorker, time.Second, ctx.Done())
}
<-ctx.Done()
logrus.Infof("Shutting down %s controller", g.name)
}
func (g *genericController) runWorker() {
for g.processNextWorkItem() {
}
}
func (g *genericController) processNextWorkItem() bool {
key, quit := g.queue.Get()
if quit {
return false
}
defer g.queue.Done(key)
// do your work on the key. This method will contains your "do stuff" logic
err := g.syncHandler(key.(string))
if err == nil {
g.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
g.queue.AddRateLimited(key)
return true
}
func (g *genericController) syncHandler(s string) error {
var errs []error
for _, handler := range g.handlers {
if err := handler(s); err != nil {
errs = append(errs, err)
}
}
return types.NewErrors(errs)
}