1
0
mirror of https://github.com/rancher/norman.git synced 2025-09-09 19:19:42 +00:00

Handlers return object and error

This commit is contained in:
Darren Shepherd
2018-10-30 09:54:49 -07:00
parent dfeffc8a3f
commit 77869d26b3
4 changed files with 160 additions and 86 deletions

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"reflect"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -41,12 +42,12 @@ func init() {
} }
} }
type HandlerFunc func(key string) error type HandlerFunc func(key string, obj interface{}) (interface{}, error)
type GenericController interface { type GenericController interface {
SetThreadinessOverride(count int) SetThreadinessOverride(count int)
Informer() cache.SharedIndexInformer Informer() cache.SharedIndexInformer
AddHandler(name string, handler HandlerFunc) AddHandler(ctx context.Context, name string, handler HandlerFunc)
HandlerCount() int HandlerCount() int
Enqueue(namespace, name string) Enqueue(namespace, name string)
Sync(ctx context.Context) error Sync(ctx context.Context) error
@@ -60,15 +61,22 @@ type Backend interface {
} }
type handlerDef struct { type handlerDef struct {
name string name string
handler HandlerFunc generation int
handler HandlerFunc
}
type generationKey struct {
generation int
key string
} }
type genericController struct { type genericController struct {
sync.Mutex sync.Mutex
threadinessOverride int threadinessOverride int
generation int
informer cache.SharedIndexInformer informer cache.SharedIndexInformer
handlers []handlerDef handlers []*handlerDef
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
name string name string
running bool running bool
@@ -116,11 +124,28 @@ func (g *genericController) Enqueue(namespace, name string) {
} }
} }
func (g *genericController) AddHandler(name string, handler HandlerFunc) { func (g *genericController) AddHandler(ctx context.Context, name string, handler HandlerFunc) {
g.handlers = append(g.handlers, handlerDef{ g.Lock()
name: name, h := &handlerDef{
handler: handler, name: name,
}) generation: g.generation,
handler: handler,
}
g.handlers = append(g.handlers, h)
g.Unlock()
go func() {
<-ctx.Done()
g.Lock()
var handlers []*handlerDef
for _, handler := range g.handlers {
if handler != h {
handlers = append(handlers, h)
}
}
g.handlers = handlers
g.Unlock()
}()
} }
func (g *genericController) Sync(ctx context.Context) error { func (g *genericController) Sync(ctx context.Context) error {
@@ -175,6 +200,22 @@ func (g *genericController) Start(ctx context.Context, threadiness int) error {
go g.run(ctx, threadiness) go g.run(ctx, threadiness)
} }
if g.running {
for _, h := range g.handlers {
if h.generation != g.generation {
continue
}
for _, key := range g.informer.GetStore().ListKeys() {
g.queueObject(generationKey{
generation: g.generation,
key: key,
})
}
break
}
}
g.generation++
g.running = true g.running = true
return nil return nil
} }
@@ -211,7 +252,7 @@ func (g *genericController) processNextWorkItem() bool {
defer g.queue.Done(key) defer g.queue.Done(key)
// do your work on the key. This method will contains your "do stuff" logic // do your work on the key. This method will contains your "do stuff" logic
err := g.syncHandler(key.(string)) err := g.syncHandler(key)
checkErr := err checkErr := err
if handlerErr, ok := checkErr.(*handlerError); ok { if handlerErr, ok := checkErr.(*handlerError); ok {
checkErr = handlerErr.err checkErr = handlerErr.err
@@ -265,14 +306,39 @@ func filterConflictsError(err error) error {
return err return err
} }
func (g *genericController) syncHandler(s string) (err error) { func (g *genericController) syncHandler(key interface{}) (err error) {
defer utilruntime.RecoverFromPanic(&err) defer utilruntime.RecoverFromPanic(&err)
generation := -1
var s string
var obj interface{}
switch v := key.(type) {
case string:
s = v
case generationKey:
generation = v.generation
s = v.key
default:
return nil
}
obj, exists, err := g.informer.GetStore().GetByKey(s)
if err != nil {
return err
} else if !exists {
obj = nil
}
var errs []error var errs []error
for _, handler := range g.handlers { for _, handler := range g.handlers {
if generation > -1 && handler.generation != generation {
continue
}
logrus.Debugf("%s calling handler %s %s", g.name, handler.name, s) logrus.Debugf("%s calling handler %s %s", g.name, handler.name, s)
metrics.IncTotalHandlerExecution(g.name, handler.name) metrics.IncTotalHandlerExecution(g.name, handler.name)
if err := handler.handler(s); err != nil { if newObj, err := handler.handler(s, obj); err != nil {
if !ignoreError(err, false) { if !ignoreError(err, false) {
metrics.IncTotalHandlerFailure(g.name, handler.name, s) metrics.IncTotalHandlerFailure(g.name, handler.name, s)
} }
@@ -280,6 +346,8 @@ func (g *genericController) syncHandler(s string) (err error) {
name: handler.name, name: handler.name,
err: err, err: err,
}) })
} else if newObj != nil && !reflect.ValueOf(newObj).IsNil() {
obj = newObj
} }
} }
err = types.NewErrors(errs...) err = types.NewErrors(errs...)

View File

@@ -41,7 +41,7 @@ type {{.schema.CodeName}}List struct {
Items []{{.prefix}}{{.schema.CodeName}} Items []{{.prefix}}{{.schema.CodeName}}
} }
type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) error type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error)
type {{.schema.CodeName}}Lister interface { type {{.schema.CodeName}}Lister interface {
List(namespace string, selector labels.Selector) (ret []*{{.prefix}}{{.schema.CodeName}}, err error) List(namespace string, selector labels.Selector) (ret []*{{.prefix}}{{.schema.CodeName}}, err error)
@@ -52,8 +52,8 @@ type {{.schema.CodeName}}Controller interface {
Generic() controller.GenericController Generic() controller.GenericController
Informer() cache.SharedIndexInformer Informer() cache.SharedIndexInformer
Lister() {{.schema.CodeName}}Lister Lister() {{.schema.CodeName}}Lister
AddHandler(name string, handler {{.schema.CodeName}}HandlerFunc) AddHandler(ctx context.Context, name string, handler {{.schema.CodeName}}HandlerFunc)
AddClusterScopedHandler(name, clusterName string, handler {{.schema.CodeName}}HandlerFunc) AddClusterScopedHandler(ctx context.Context, name, clusterName string, handler {{.schema.CodeName}}HandlerFunc)
Enqueue(namespace, name string) Enqueue(namespace, name string)
Sync(ctx context.Context) error Sync(ctx context.Context) error
Start(ctx context.Context, threadiness int) error Start(ctx context.Context, threadiness int) error
@@ -71,10 +71,10 @@ type {{.schema.CodeName}}Interface interface {
Watch(opts metav1.ListOptions) (watch.Interface, error) Watch(opts metav1.ListOptions) (watch.Interface, error)
DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error
Controller() {{.schema.CodeName}}Controller Controller() {{.schema.CodeName}}Controller
AddHandler(name string, sync {{.schema.CodeName}}HandlerFunc) AddHandler(ctx context.Context, name string, sync {{.schema.CodeName}}HandlerFunc)
AddLifecycle(name string, lifecycle {{.schema.CodeName}}Lifecycle) AddLifecycle(ctx context.Context, name string, lifecycle {{.schema.CodeName}}Lifecycle)
AddClusterScopedHandler(name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) AddClusterScopedHandler(ctx context.Context, name, clusterName string, sync {{.schema.CodeName}}HandlerFunc)
AddClusterScopedLifecycle(name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) AddClusterScopedLifecycle(ctx context.Context, name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle)
} }
type {{.schema.ID}}Lister struct { type {{.schema.ID}}Lister struct {
@@ -123,34 +123,27 @@ func (c *{{.schema.ID}}Controller) Lister() {{.schema.CodeName}}Lister {
} }
func (c *{{.schema.ID}}Controller) AddHandler(name string, handler {{.schema.CodeName}}HandlerFunc) { func (c *{{.schema.ID}}Controller) AddHandler(ctx context.Context, name string, handler {{.schema.CodeName}}HandlerFunc) {
c.GenericController.AddHandler(name, func(key string) error { c.GenericController.AddHandler(ctx, name, func(key string, obj interface{}) (interface{}, error) {
obj, exists, err := c.Informer().GetStore().GetByKey(key) if obj == nil {
if err != nil {
return err
}
if !exists {
return handler(key, nil) return handler(key, nil)
} else if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok {
return handler(key, v)
} else {
return nil, nil
} }
return handler(key, obj.(*{{.prefix}}{{.schema.CodeName}}))
}) })
} }
func (c *{{.schema.ID}}Controller) AddClusterScopedHandler(name, cluster string, handler {{.schema.CodeName}}HandlerFunc) { func (c *{{.schema.ID}}Controller) AddClusterScopedHandler(ctx context.Context, name, cluster string, handler {{.schema.CodeName}}HandlerFunc) {
c.GenericController.AddHandler(name, func(key string) error { c.GenericController.AddHandler(ctx, name, func(key string, obj interface{}) (interface{}, error) {
obj, exists, err := c.Informer().GetStore().GetByKey(key) if obj == nil {
if err != nil {
return err
}
if !exists {
return handler(key, nil) return handler(key, nil)
} else if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok && controller.ObjectInCluster(cluster, obj) {
return handler(key, v)
} else {
return nil, nil
} }
if !controller.ObjectInCluster(cluster, obj) {
return nil
}
return handler(key, obj.(*{{.prefix}}{{.schema.CodeName}}))
}) })
} }
@@ -245,21 +238,21 @@ func (s *{{.schema.ID}}Client) DeleteCollection(deleteOpts *metav1.DeleteOptions
return s.objectClient.DeleteCollection(deleteOpts, listOpts) return s.objectClient.DeleteCollection(deleteOpts, listOpts)
} }
func (s *{{.schema.ID}}Client) AddHandler(name string, sync {{.schema.CodeName}}HandlerFunc) { func (s *{{.schema.ID}}Client) AddHandler(ctx context.Context, name string, sync {{.schema.CodeName}}HandlerFunc) {
s.Controller().AddHandler(name, sync) s.Controller().AddHandler(ctx, name, sync)
} }
func (s *{{.schema.ID}}Client) AddLifecycle(name string, lifecycle {{.schema.CodeName}}Lifecycle) { func (s *{{.schema.ID}}Client) AddLifecycle(ctx context.Context, name string, lifecycle {{.schema.CodeName}}Lifecycle) {
sync := New{{.schema.CodeName}}LifecycleAdapter(name, false, s, lifecycle) sync := New{{.schema.CodeName}}LifecycleAdapter(name, false, s, lifecycle)
s.AddHandler(name, sync) s.Controller().AddHandler(ctx, name, sync)
} }
func (s *{{.schema.ID}}Client) AddClusterScopedHandler(name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) { func (s *{{.schema.ID}}Client) AddClusterScopedHandler(ctx context.Context, name, clusterName string, sync {{.schema.CodeName}}HandlerFunc) {
s.Controller().AddClusterScopedHandler(name, clusterName, sync) s.Controller().AddClusterScopedHandler(ctx, name, clusterName, sync)
} }
func (s *{{.schema.ID}}Client) AddClusterScopedLifecycle(name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) { func (s *{{.schema.ID}}Client) AddClusterScopedLifecycle(ctx context.Context, name, clusterName string, lifecycle {{.schema.CodeName}}Lifecycle) {
sync := New{{.schema.CodeName}}LifecycleAdapter(name+"_"+clusterName, true, s, lifecycle) sync := New{{.schema.CodeName}}LifecycleAdapter(name+"_"+clusterName, true, s, lifecycle)
s.AddClusterScopedHandler(name, clusterName, sync) s.Controller().AddClusterScopedHandler(ctx, name, clusterName, sync)
} }
` `

View File

@@ -5,6 +5,7 @@ var lifecycleTemplate = `package {{.schema.Version.Version}}
import ( import (
{{.importPackage}} {{.importPackage}}
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"github.com/rancher/norman/controller"
"github.com/rancher/norman/lifecycle" "github.com/rancher/norman/lifecycle"
) )
@@ -45,11 +46,12 @@ func (w *{{.schema.ID}}LifecycleAdapter) Updated(obj runtime.Object) (runtime.Ob
func New{{.schema.CodeName}}LifecycleAdapter(name string, clusterScoped bool, client {{.schema.CodeName}}Interface, l {{.schema.CodeName}}Lifecycle) {{.schema.CodeName}}HandlerFunc { func New{{.schema.CodeName}}LifecycleAdapter(name string, clusterScoped bool, client {{.schema.CodeName}}Interface, l {{.schema.CodeName}}Lifecycle) {{.schema.CodeName}}HandlerFunc {
adapter := &{{.schema.ID}}LifecycleAdapter{lifecycle: l} adapter := &{{.schema.ID}}LifecycleAdapter{lifecycle: l}
syncFn := lifecycle.NewObjectLifecycleAdapter(name, clusterScoped, adapter, client.ObjectClient()) syncFn := lifecycle.NewObjectLifecycleAdapter(name, clusterScoped, adapter, client.ObjectClient())
return func(key string, obj *{{.prefix}}{{.schema.CodeName}}) error { return func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) {
if obj == nil { newObj, err := syncFn(key, obj)
return syncFn(key, nil) if o, ok := newObj.(*{{.prefix}}{{.schema.CodeName}}); ok {
return o, err
} }
return syncFn(key, obj) return nil, err
} }
} }
` `

View File

@@ -30,7 +30,7 @@ type objectLifecycleAdapter struct {
objectClient *objectclient.ObjectClient objectClient *objectclient.ObjectClient
} }
func NewObjectLifecycleAdapter(name string, clusterScoped bool, lifecycle ObjectLifecycle, objectClient *objectclient.ObjectClient) func(key string, obj runtime.Object) error { func NewObjectLifecycleAdapter(name string, clusterScoped bool, lifecycle ObjectLifecycle, objectClient *objectclient.ObjectClient) func(key string, obj interface{}) (interface{}, error) {
o := objectLifecycleAdapter{ o := objectLifecycleAdapter{
name: name, name: name,
clusterScoped: clusterScoped, clusterScoped: clusterScoped,
@@ -40,30 +40,39 @@ func NewObjectLifecycleAdapter(name string, clusterScoped bool, lifecycle Object
return o.sync return o.sync
} }
func (o *objectLifecycleAdapter) sync(key string, obj runtime.Object) error { func (o *objectLifecycleAdapter) sync(key string, in interface{}) (interface{}, error) {
if obj == nil { if in == nil || reflect.ValueOf(in).IsNil() {
return nil return nil, nil
}
obj, ok := in.(runtime.Object)
if !ok {
return nil, nil
} }
metadata, err := meta.Accessor(obj) metadata, err := meta.Accessor(obj)
if err != nil { if err != nil {
return err return nil, err
} }
if cont, err := o.finalize(metadata, obj); err != nil || !cont { if newObj, cont, err := o.finalize(metadata, obj); err != nil || !cont {
return err return nil, err
} else if newObj != nil {
obj = newObj
} }
if cont, err := o.create(metadata, obj); err != nil || !cont { if newObj, cont, err := o.create(metadata, obj); err != nil || !cont {
return err return nil, err
} else if newObj != nil {
obj = newObj
} }
copyObj := obj.DeepCopyObject() copyObj := obj.DeepCopyObject()
newObj, err := o.lifecycle.Updated(copyObj) newObj, err := o.lifecycle.Updated(copyObj)
if newObj != nil { if newObj != nil {
o.update(metadata.GetName(), obj, newObj) return o.update(metadata.GetName(), obj, newObj)
} }
return err return nil, err
} }
func (o *objectLifecycleAdapter) update(name string, orig, obj runtime.Object) (runtime.Object, error) { func (o *objectLifecycleAdapter) update(name string, orig, obj runtime.Object) (runtime.Object, error) {
@@ -73,34 +82,36 @@ func (o *objectLifecycleAdapter) update(name string, orig, obj runtime.Object) (
return obj, nil return obj, nil
} }
func (o *objectLifecycleAdapter) finalize(metadata metav1.Object, obj runtime.Object) (bool, error) { func (o *objectLifecycleAdapter) finalize(metadata metav1.Object, obj runtime.Object) (runtime.Object, bool, error) {
// Check finalize // Check finalize
if metadata.GetDeletionTimestamp() == nil { if metadata.GetDeletionTimestamp() == nil {
return true, nil return nil, true, nil
} }
if !slice.ContainsString(metadata.GetFinalizers(), o.constructFinalizerKey()) { if !slice.ContainsString(metadata.GetFinalizers(), o.constructFinalizerKey()) {
return false, nil return nil, false, nil
} }
copyObj := obj.DeepCopyObject() copyObj := obj.DeepCopyObject()
if newObj, err := o.lifecycle.Finalize(copyObj); err != nil { if newObj, err := o.lifecycle.Finalize(copyObj); err != nil {
if newObj != nil { if newObj != nil {
o.update(metadata.GetName(), obj, newObj) newObj, _ := o.update(metadata.GetName(), obj, newObj)
return newObj, false, err
} }
return false, err return nil, false, err
} else if newObj != nil { } else if newObj != nil {
copyObj = newObj copyObj = newObj
} }
return false, o.removeFinalizer(o.constructFinalizerKey(), copyObj) newObj, err := o.removeFinalizer(o.constructFinalizerKey(), copyObj)
return newObj, false, err
} }
func (o *objectLifecycleAdapter) removeFinalizer(name string, obj runtime.Object) error { func (o *objectLifecycleAdapter) removeFinalizer(name string, obj runtime.Object) (runtime.Object, error) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
metadata, err := meta.Accessor(obj) metadata, err := meta.Accessor(obj)
if err != nil { if err != nil {
return err return nil, err
} }
var finalizers []string var finalizers []string
@@ -112,18 +123,18 @@ func (o *objectLifecycleAdapter) removeFinalizer(name string, obj runtime.Object
} }
metadata.SetFinalizers(finalizers) metadata.SetFinalizers(finalizers)
_, err = o.objectClient.Update(metadata.GetName(), obj) newObj, err := o.objectClient.Update(metadata.GetName(), obj)
if err == nil { if err == nil {
return nil return newObj, nil
} }
obj, err = o.objectClient.GetNamespaced(metadata.GetNamespace(), metadata.GetName(), metav1.GetOptions{}) obj, err = o.objectClient.GetNamespaced(metadata.GetNamespace(), metadata.GetName(), metav1.GetOptions{})
if err != nil { if err != nil {
return err return nil, err
} }
} }
return fmt.Errorf("failed to remove finalizer on %s", name) return nil, fmt.Errorf("failed to remove finalizer on %s", name)
} }
func (o *objectLifecycleAdapter) createKey() string { func (o *objectLifecycleAdapter) createKey() string {
@@ -137,25 +148,26 @@ func (o *objectLifecycleAdapter) constructFinalizerKey() string {
return finalizerKey + o.name return finalizerKey + o.name
} }
func (o *objectLifecycleAdapter) create(metadata metav1.Object, obj runtime.Object) (bool, error) { func (o *objectLifecycleAdapter) create(metadata metav1.Object, obj runtime.Object) (runtime.Object, bool, error) {
if o.isInitialized(metadata) { if o.isInitialized(metadata) {
return true, nil return nil, true, nil
} }
copyObj := obj.DeepCopyObject() copyObj := obj.DeepCopyObject()
copyObj, err := o.addFinalizer(copyObj) copyObj, err := o.addFinalizer(copyObj)
if err != nil { if err != nil {
return false, err return copyObj, false, err
} }
if newObj, err := o.lifecycle.Create(copyObj); err != nil { if newObj, err := o.lifecycle.Create(copyObj); err != nil {
o.update(metadata.GetName(), obj, newObj) newObj, _ = o.update(metadata.GetName(), obj, newObj)
return false, err return newObj, false, err
} else if newObj != nil { } else if newObj != nil {
copyObj = newObj copyObj = newObj
} }
return false, o.setInitialized(copyObj) newObj, err := o.setInitialized(copyObj)
return newObj, false, err
} }
func (o *objectLifecycleAdapter) isInitialized(metadata metav1.Object) bool { func (o *objectLifecycleAdapter) isInitialized(metadata metav1.Object) bool {
@@ -163,10 +175,10 @@ func (o *objectLifecycleAdapter) isInitialized(metadata metav1.Object) bool {
return metadata.GetAnnotations()[initialized] == "true" return metadata.GetAnnotations()[initialized] == "true"
} }
func (o *objectLifecycleAdapter) setInitialized(obj runtime.Object) error { func (o *objectLifecycleAdapter) setInitialized(obj runtime.Object) (runtime.Object, error) {
metadata, err := meta.Accessor(obj) metadata, err := meta.Accessor(obj)
if err != nil { if err != nil {
return err return nil, err
} }
initialized := o.createKey() initialized := o.createKey()
@@ -176,8 +188,7 @@ func (o *objectLifecycleAdapter) setInitialized(obj runtime.Object) error {
} }
metadata.GetAnnotations()[initialized] = "true" metadata.GetAnnotations()[initialized] = "true"
_, err = o.objectClient.Update(metadata.GetName(), obj) return o.objectClient.Update(metadata.GetName(), obj)
return err
} }
func (o *objectLifecycleAdapter) addFinalizer(obj runtime.Object) (runtime.Object, error) { func (o *objectLifecycleAdapter) addFinalizer(obj runtime.Object) (runtime.Object, error) {