1
0
mirror of https://github.com/rancher/norman.git synced 2025-04-29 20:04:03 +00:00
norman/controller/generic_controller.go

221 lines
4.8 KiB
Go
Raw Normal View History

2017-11-11 04:44:02 +00:00
package controller
import (
"context"
"fmt"
"sync"
"time"
2018-01-21 06:23:04 +00:00
"github.com/juju/ratelimit"
2017-11-11 04:44:02 +00:00
"github.com/rancher/norman/clientbase"
"github.com/rancher/norman/types"
"github.com/sirupsen/logrus"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
var (
2018-01-17 22:42:31 +00:00
resyncPeriod = 2 * time.Hour
2017-11-11 04:44:02 +00:00
)
type HandlerFunc func(key string) error
type GenericController interface {
Informer() cache.SharedIndexInformer
2018-01-16 05:08:36 +00:00
AddHandler(name string, handler HandlerFunc)
2017-11-28 21:28:25 +00:00
HandlerCount() int
2017-11-11 04:44:02 +00:00
Enqueue(namespace, name string)
2017-11-28 21:28:25 +00:00
Sync(ctx context.Context) error
2017-11-21 23:16:45 +00:00
Start(ctx context.Context, threadiness int) error
2017-11-11 04:44:02 +00:00
}
2018-01-16 05:08:36 +00:00
type handlerDef struct {
name string
handler HandlerFunc
}
2017-11-11 04:44:02 +00:00
type genericController struct {
sync.Mutex
informer cache.SharedIndexInformer
2018-01-16 05:08:36 +00:00
handlers []handlerDef
2017-11-11 04:44:02 +00:00
queue workqueue.RateLimitingInterface
name string
running bool
2017-11-28 21:28:25 +00:00
synced bool
2017-11-11 04:44:02 +00:00
}
2017-11-13 19:50:25 +00:00
func NewGenericController(name string, objectClient *clientbase.ObjectClient) GenericController {
2017-11-11 04:44:02 +00:00
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: objectClient.List,
WatchFunc: objectClient.Watch,
},
objectClient.Factory.Object(), resyncPeriod, cache.Indexers{})
2018-01-21 06:23:04 +00:00
rl := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
)
2017-11-11 04:44:02 +00:00
return &genericController{
informer: informer,
2018-01-21 06:23:04 +00:00
queue: workqueue.NewNamedRateLimitingQueue(rl, name),
name: name,
2017-11-13 19:50:25 +00:00
}
2017-11-11 04:44:02 +00:00
}
2017-11-28 21:28:25 +00:00
func (g *genericController) HandlerCount() int {
return len(g.handlers)
}
2017-11-11 04:44:02 +00:00
func (g *genericController) Informer() cache.SharedIndexInformer {
return g.informer
}
func (g *genericController) Enqueue(namespace, name string) {
if namespace == "" {
g.queue.Add(name)
} else {
g.queue.Add(namespace + "/" + name)
}
}
2018-01-16 05:08:36 +00:00
func (g *genericController) AddHandler(name string, handler HandlerFunc) {
g.handlers = append(g.handlers, handlerDef{
name: name,
handler: handler,
})
2017-11-11 04:44:02 +00:00
}
2017-11-28 21:28:25 +00:00
func (g *genericController) Sync(ctx context.Context) error {
2017-11-11 04:44:02 +00:00
g.Lock()
defer g.Unlock()
2017-11-28 21:28:25 +00:00
return g.sync(ctx)
2017-11-11 04:44:02 +00:00
}
2017-11-28 21:28:25 +00:00
func (g *genericController) sync(ctx context.Context) error {
if g.synced {
return nil
2017-11-11 04:44:02 +00:00
}
defer utilruntime.HandleCrash()
g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: g.queueObject,
UpdateFunc: func(_, obj interface{}) {
g.queueObject(obj)
},
DeleteFunc: g.queueObject,
})
2017-11-29 01:24:53 +00:00
logrus.Infof("Syncing %s Controller", g.name)
2017-11-11 04:44:02 +00:00
go g.informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) {
2017-11-28 21:28:25 +00:00
return fmt.Errorf("failed to sync controller %s", g.name)
2017-11-11 04:44:02 +00:00
}
2017-11-29 01:24:53 +00:00
logrus.Infof("Syncing %s Controller Done", g.name)
2017-11-11 04:44:02 +00:00
2017-11-28 21:28:25 +00:00
g.synced = true
return nil
}
func (g *genericController) Start(ctx context.Context, threadiness int) error {
g.Lock()
defer g.Unlock()
if !g.synced {
if err := g.sync(ctx); err != nil {
return err
}
}
if !g.running {
go g.run(ctx, threadiness)
}
g.running = true
return nil
}
func (g *genericController) queueObject(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
g.queue.Add(key)
}
}
func (g *genericController) run(ctx context.Context, threadiness int) {
defer utilruntime.HandleCrash()
defer g.queue.ShutDown()
2017-11-11 04:44:02 +00:00
for i := 0; i < threadiness; i++ {
go wait.Until(g.runWorker, time.Second, ctx.Done())
}
<-ctx.Done()
logrus.Infof("Shutting down %s controller", g.name)
}
func (g *genericController) runWorker() {
for g.processNextWorkItem() {
}
}
func (g *genericController) processNextWorkItem() bool {
key, quit := g.queue.Get()
if quit {
return false
}
defer g.queue.Done(key)
// do your work on the key. This method will contains your "do stuff" logic
err := g.syncHandler(key.(string))
2018-01-16 05:08:36 +00:00
checkErr := err
if handlerErr, ok := checkErr.(*handlerError); ok {
checkErr = handlerErr.err
}
if _, ok := checkErr.(*ForgetError); err == nil || ok {
if ok {
logrus.Infof("%v %v completed with dropped err: %v", g.name, key, err)
}
2017-11-11 04:44:02 +00:00
g.queue.Forget(key)
return true
}
2018-01-16 05:08:36 +00:00
utilruntime.HandleError(fmt.Errorf("%v %v %v", g.name, key, err))
2017-11-11 04:44:02 +00:00
g.queue.AddRateLimited(key)
return true
}
2018-01-16 05:08:36 +00:00
func (g *genericController) syncHandler(s string) (err error) {
defer utilruntime.RecoverFromPanic(&err)
2017-11-11 04:44:02 +00:00
var errs []error
for _, handler := range g.handlers {
2018-01-16 05:08:36 +00:00
if err := handler.handler(s); err != nil {
errs = append(errs, &handlerError{
name: handler.name,
err: err,
})
2017-11-11 04:44:02 +00:00
}
}
2018-01-16 05:08:36 +00:00
err = types.NewErrors(errs)
return
}
type handlerError struct {
name string
err error
}
func (h *handlerError) Error() string {
return fmt.Sprintf("[%s] failed with : %v", h.name, h.err)
2017-11-11 04:44:02 +00:00
}