1
0
mirror of https://github.com/rancher/norman.git synced 2025-06-21 13:07:10 +00:00
norman/controller/generic_controller.go
Bill Maxwell de9510b940 Default to noop metric provider
With the built in prom. metrics provider, the k8s machinery doesnt
deregister metrics when controllers are removed. So over time as
things like clusters are created or removed the metrics are not
cleaned up. The metrics types for the cache and queue are also
very large. They can take ~1GB of RAM in a 100 cluster setup.

Also, Rancher is not exposing these stats so they are unobservable.
2018-07-27 12:50:44 -07:00

283 lines
6.2 KiB
Go

package controller
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
errors2 "github.com/pkg/errors"
"github.com/rancher/norman/objectclient"
"github.com/rancher/norman/types"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const MetricsEnv = "NORMAN_QUEUE_METRICS"
var (
resyncPeriod = 2 * time.Hour
)
// Override the metrics providers
func init() {
if os.Getenv(MetricsEnv) != "true" {
DisableAllControllerMetrics()
}
}
type HandlerFunc func(key string) error
type GenericController interface {
Informer() cache.SharedIndexInformer
AddHandler(name string, handler HandlerFunc)
HandlerCount() int
Enqueue(namespace, name string)
Sync(ctx context.Context) error
Start(ctx context.Context, threadiness int) error
}
type Backend interface {
List(opts metav1.ListOptions) (runtime.Object, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
ObjectFactory() objectclient.ObjectFactory
}
type handlerDef struct {
name string
handler HandlerFunc
}
type genericController struct {
sync.Mutex
informer cache.SharedIndexInformer
handlers []handlerDef
queue workqueue.RateLimitingInterface
name string
running bool
synced bool
}
func NewGenericController(name string, genericClient Backend) GenericController {
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: genericClient.List,
WatchFunc: genericClient.Watch,
},
genericClient.ObjectFactory().Object(), resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
rl := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
return &genericController{
informer: informer,
queue: workqueue.NewNamedRateLimitingQueue(rl, name),
name: name,
}
}
func (g *genericController) HandlerCount() int {
return len(g.handlers)
}
func (g *genericController) Informer() cache.SharedIndexInformer {
return g.informer
}
func (g *genericController) Enqueue(namespace, name string) {
if namespace == "" {
g.queue.Add(name)
} else {
g.queue.Add(namespace + "/" + name)
}
}
func (g *genericController) AddHandler(name string, handler HandlerFunc) {
g.handlers = append(g.handlers, handlerDef{
name: name,
handler: handler,
})
}
func (g *genericController) Sync(ctx context.Context) error {
g.Lock()
defer g.Unlock()
return g.sync(ctx)
}
func (g *genericController) sync(ctx context.Context) error {
if g.synced {
return nil
}
defer utilruntime.HandleCrash()
g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: g.queueObject,
UpdateFunc: func(_, obj interface{}) {
g.queueObject(obj)
},
DeleteFunc: g.queueObject,
})
logrus.Debugf("Syncing %s Controller", g.name)
go g.informer.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) {
return fmt.Errorf("failed to sync controller %s", g.name)
}
logrus.Debugf("Syncing %s Controller Done", g.name)
g.synced = true
return nil
}
func (g *genericController) Start(ctx context.Context, threadiness int) error {
g.Lock()
defer g.Unlock()
if !g.synced {
if err := g.sync(ctx); err != nil {
return err
}
}
if !g.running {
go g.run(ctx, threadiness)
}
g.running = true
return nil
}
func (g *genericController) queueObject(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
g.queue.Add(key)
}
}
func (g *genericController) run(ctx context.Context, threadiness int) {
defer utilruntime.HandleCrash()
defer g.queue.ShutDown()
for i := 0; i < threadiness; i++ {
go wait.Until(g.runWorker, time.Second, ctx.Done())
}
<-ctx.Done()
logrus.Infof("Shutting down %s controller", g.name)
}
func (g *genericController) runWorker() {
for g.processNextWorkItem() {
}
}
func (g *genericController) processNextWorkItem() bool {
key, quit := g.queue.Get()
if quit {
return false
}
defer g.queue.Done(key)
// do your work on the key. This method will contains your "do stuff" logic
err := g.syncHandler(key.(string))
checkErr := err
if handlerErr, ok := checkErr.(*handlerError); ok {
checkErr = handlerErr.err
}
if _, ok := checkErr.(*ForgetError); err == nil || ok {
if ok {
logrus.Infof("%v %v completed with dropped err: %v", g.name, key, err)
}
g.queue.Forget(key)
return true
}
if err := filterConflictsError(err); err != nil {
logrus.Errorf("%v %v %v", g.name, key, err)
}
g.queue.AddRateLimited(key)
return true
}
func ignoreError(err error, checkString bool) bool {
err = errors2.Cause(err)
if errors.IsConflict(err) {
return true
}
if _, ok := err.(*ForgetError); ok {
return true
}
if checkString {
return strings.HasSuffix(err.Error(), "please apply your changes to the latest version and try again")
}
return false
}
func filterConflictsError(err error) error {
if ignoreError(err, false) {
return nil
}
if errs, ok := errors2.Cause(err).(*types.MultiErrors); ok {
var newErrors []error
for _, err := range errs.Errors {
if !ignoreError(err, true) {
newErrors = append(newErrors)
}
}
return types.NewErrors(newErrors...)
}
return err
}
func (g *genericController) syncHandler(s string) (err error) {
defer utilruntime.RecoverFromPanic(&err)
var errs []error
for _, handler := range g.handlers {
logrus.Debugf("%s calling handler %s %s", g.name, handler.name, s)
if err := handler.handler(s); err != nil {
errs = append(errs, &handlerError{
name: handler.name,
err: err,
})
}
}
err = types.NewErrors(errs...)
return
}
type handlerError struct {
name string
err error
}
func (h *handlerError) Error() string {
return fmt.Sprintf("[%s] failed with : %v", h.name, h.err)
}
func (h *handlerError) Cause() error {
return h.err
}