From 3632a2ab7a99e03f023a1c381c362dcd462d3f16 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 19 Nov 2018 16:29:32 -0700 Subject: [PATCH 1/4] Add Track method --- condition/condition.go | 100 ++++++++++++++++++++++++++++++++++------- 1 file changed, 83 insertions(+), 17 deletions(-) diff --git a/condition/condition.go b/condition/condition.go index 172ae7bc..49dafb10 100644 --- a/condition/condition.go +++ b/condition/condition.go @@ -7,7 +7,10 @@ import ( "github.com/pkg/errors" "github.com/rancher/norman/controller" + "github.com/rancher/norman/objectclient" + "k8s.io/api/core/v1" err2 "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -43,6 +46,10 @@ func (c Cond) GetStatus(obj runtime.Object) string { return getStatus(obj, string(c)) } +func (c Cond) SetStatus(obj runtime.Object, status string) { + setStatus(obj, string(c), status) +} + func (c Cond) Unknown(obj runtime.Object) { setStatus(obj, string(c), "Unknown") } @@ -133,53 +140,97 @@ func (c Cond) Do(obj runtime.Object, f func() (runtime.Object, error)) (runtime. return c.do(obj, f) } +type ObjectClientGetter interface { + ObjectClient() *objectclient.ObjectClient +} + +func (c Cond) Track(obj runtime.Object, client ObjectClientGetter, f func() (runtime.Object, error)) (runtime.Object, error) { + obj = obj.DeepCopyObject() + retObj, changed, err := c.do2(false, obj, f) + if !changed { + return retObj, err + } + + c.SetStatus(retObj, c.GetStatus(obj)) + c.LastUpdated(retObj, c.GetLastUpdated(obj)) + c.Reason(retObj, c.GetReason(obj)) + c.Message(retObj, c.GetMessage(obj)) + + if obj, ok := retObj.(metav1.Object); ok { + updated, uerr := client.ObjectClient().Update(obj.GetName(), retObj) + if uerr == nil { + return updated, err + } + } + + return retObj, err +} + func (c Cond) do(obj runtime.Object, f func() (runtime.Object, error)) (runtime.Object, error) { + obj, _, err := c.do2(true, obj, f) + return obj, err +} + +func (c Cond) do2(setReturned bool, obj runtime.Object, f func() (runtime.Object, error)) (runtime.Object, bool, error) { status := c.GetStatus(obj) ts := c.GetLastUpdated(obj) reason := c.GetReason(obj) message := c.GetMessage(obj) - obj, err := c.doInternal(obj, f) + checkObj := obj + retObj, err := c.doInternal(setReturned, obj, f) + if setReturned { + checkObj = retObj + } // This is to prevent non stop flapping of states and update - if status == c.GetStatus(obj) && - reason == c.GetReason(obj) { - if message != c.GetMessage(obj) { - replaced := temfileRegexp.ReplaceAllString(c.GetMessage(obj), "file_path_redacted") - c.Message(obj, replaced) + if status == c.GetStatus(checkObj) && + reason == c.GetReason(checkObj) { + if message != c.GetMessage(checkObj) { + replaced := temfileRegexp.ReplaceAllString(c.GetMessage(checkObj), "file_path_redacted") + c.Message(checkObj, replaced) } - if message == c.GetMessage(obj) { - c.LastUpdated(obj, ts) + if message == c.GetMessage(checkObj) { + c.LastUpdated(checkObj, ts) } } - return obj, err + changed := status != c.GetStatus(checkObj) || + ts != c.GetLastUpdated(checkObj) || + reason != c.GetReason(checkObj) || + message != c.GetMessage(checkObj) + + return retObj, changed, err } -func (c Cond) doInternal(obj runtime.Object, f func() (runtime.Object, error)) (runtime.Object, error) { +func (c Cond) doInternal(setReturned bool, obj runtime.Object, f func() (runtime.Object, error)) (runtime.Object, error) { if !c.IsFalse(obj) { c.Unknown(obj) } + setObject := obj newObj, err := f() if newObj != nil && !reflect.ValueOf(newObj).IsNil() { obj = newObj + if setReturned { + setObject = obj + } } if err != nil { if _, ok := err.(*controller.ForgetError); ok { - if c.GetMessage(obj) == "" { - c.ReasonAndMessageFromError(obj, err) + if c.GetMessage(setObject) == "" { + c.ReasonAndMessageFromError(setObject, err) } return obj, err } - c.False(obj) - c.ReasonAndMessageFromError(obj, err) + c.False(setObject) + c.ReasonAndMessageFromError(setObject, err) return obj, err } - c.True(obj) - c.Reason(obj, "") - c.Message(obj, "") + c.True(setObject) + c.Reason(setObject, "") + c.Message(setObject, "") return obj, nil } @@ -294,3 +345,18 @@ type conditionError struct { func (e *conditionError) Error() string { return e.message } + +type GenericCondition struct { + // Type of cluster condition. + Type string `json:"type"` + // Status of the condition, one of True, False, Unknown. + Status v1.ConditionStatus `json:"status"` + // The last time this condition was updated. + LastUpdateTime string `json:"lastUpdateTime,omitempty"` + // Last time the condition transitioned from one status to another. + LastTransitionTime string `json:"lastTransitionTime,omitempty"` + // The reason for the condition's last transition. + Reason string `json:"reason,omitempty"` + // Human-readable message indicating details about last transition + Message string `json:"message,omitempty"` +} From a0367219000ffb8d9cba3afa9b099d3cbc6ac380 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 19 Nov 2018 16:30:00 -0700 Subject: [PATCH 2/4] New client style --- generator/controller_template.go | 181 +++++++++++++++++++++++++++++++ generator/k8s_client_template.go | 36 +++++- lifecycle/object.go | 130 +++++++++++++++------- 3 files changed, 306 insertions(+), 41 deletions(-) diff --git a/generator/controller_template.go b/generator/controller_template.go index c913dfec..b0fd44ec 100644 --- a/generator/controller_template.go +++ b/generator/controller_template.go @@ -43,6 +43,8 @@ type {{.schema.CodeName}}List struct { type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) +type {{.schema.CodeName}}ChangeHandlerFunc func(obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) + type {{.schema.CodeName}}Lister interface { List(namespace string, selector labels.Selector) (ret []*{{.prefix}}{{.schema.CodeName}}, err error) Get(namespace, name string) (*{{.prefix}}{{.schema.CodeName}}, error) @@ -255,4 +257,183 @@ func (s *{{.schema.ID}}Client) AddClusterScopedLifecycle(ctx context.Context, na sync := New{{.schema.CodeName}}LifecycleAdapter(name+"_"+clusterName, true, s, lifecycle) s.Controller().AddClusterScopedHandler(ctx, name, clusterName, sync) } + +type {{.schema.CodeName}}Indexer func(obj *{{.prefix}}{{.schema.CodeName}}) ([]string, error) + +type {{.schema.CodeName}}ClientCache interface { + Get(namespace, name string) (*{{.prefix}}{{.schema.CodeName}}, error) + List(namespace string, selector labels.Selector) ([]*{{.prefix}}{{.schema.CodeName}}, error) + + Index(name string, indexer {{.schema.CodeName}}Indexer) + GetIndexed(name, key string) ([]*{{.prefix}}{{.schema.CodeName}}, error) +} + +type {{.schema.CodeName}}Client interface { + Create(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) + Get(namespace, name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) + Update(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) + Delete(namespace, name string, options *metav1.DeleteOptions) error + List(namespace string, opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) + Watch(opts metav1.ListOptions) (watch.Interface, error) + + Cache() {{.schema.CodeName}}ClientCache + + OnCreate(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) + OnChange(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) + OnRemove(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) + Enqueue(namespace, name string) + + Generic() controller.GenericController + Interface() {{.schema.CodeName}}Interface +} + +type {{.schema.ID}}ClientCache struct { + client *{{.schema.ID}}Client2 +} + +type {{.schema.ID}}Client2 struct { + iface {{.schema.CodeName}}Interface + controller {{.schema.CodeName}}Controller +} + +func (n *{{.schema.ID}}Client2) Interface() {{.schema.CodeName}}Interface { + return n.iface +} + +func (n *{{.schema.ID}}Client2) Generic() controller.GenericController { + return n.iface.Controller().Generic() +} + +func (n *{{.schema.ID}}Client2) Enqueue(namespace, name string) { + n.iface.Controller().Enqueue(namespace, name) +} + +func (n *{{.schema.ID}}Client2) Create(obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) { + return n.iface.Create(obj) +} + +func (n *{{.schema.ID}}Client2) Get(namespace, name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) { + return n.iface.GetNamespaced(namespace, name, opts) +} + +func (n *{{.schema.ID}}Client2) Update(obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) { + return n.iface.Update(obj) +} + +func (n *{{.schema.ID}}Client2) Delete(namespace, name string, options *metav1.DeleteOptions) error { + return n.iface.DeleteNamespaced(namespace, name, options) +} + +func (n *{{.schema.ID}}Client2) List(namespace string, opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) { + return n.iface.List(opts) +} + +func (n *{{.schema.ID}}Client2) Watch(opts metav1.ListOptions) (watch.Interface, error) { + return n.iface.Watch(opts) +} + +func (n *{{.schema.ID}}ClientCache) Get(namespace, name string) (*{{.prefix}}{{.schema.CodeName}}, error) { + return n.client.controller.Lister().Get(namespace, name) +} + +func (n *{{.schema.ID}}ClientCache) List(namespace string, selector labels.Selector) ([]*{{.prefix}}{{.schema.CodeName}}, error) { + return n.client.controller.Lister().List(namespace, selector) +} + +func (n *{{.schema.ID}}Client2) Cache() {{.schema.CodeName}}ClientCache { + n.loadController() + return &{{.schema.ID}}ClientCache{ + client: n, + } +} + +func (n *{{.schema.ID}}Client2) OnCreate(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) { + n.loadController() + n.iface.AddLifecycle(ctx, name, &{{.schema.ID}}LifecycleDelegate{create: sync}) +} + +func (n *{{.schema.ID}}Client2) OnChange(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) { + n.loadController() + n.iface.AddLifecycle(ctx, name, &{{.schema.ID}}LifecycleDelegate{update: sync}) +} + +func (n *{{.schema.ID}}Client2) OnRemove(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) { + n.loadController() + n.iface.AddLifecycle(ctx, name, &{{.schema.ID}}LifecycleDelegate{remove: sync}) +} + +func (n *{{.schema.ID}}ClientCache) Index(name string, indexer {{.schema.CodeName}}Indexer) { + err := n.client.controller.Informer().GetIndexer().AddIndexers(map[string]cache.IndexFunc{ + name: func(obj interface{}) ([]string, error) { + if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok { + return indexer(v) + } + return nil, nil + }, + }) + + if err != nil { + panic(err) + } +} + +func (n *{{.schema.ID}}ClientCache) GetIndexed(name, key string) ([]*{{.prefix}}{{.schema.CodeName}}, error) { + var result []*{{.prefix}}{{.schema.CodeName}} + objs, err := n.client.controller.Informer().GetIndexer().ByIndex(name, key) + if err != nil { + return nil, err + } + for _, obj := range objs { + if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok { + result = append(result, v) + } + } + + return result, nil +} + +func (n *{{.schema.ID}}Client2) loadController() { + if n.controller == nil { + n.controller = n.iface.Controller() + } +} + +type {{.schema.ID}}LifecycleDelegate struct { + create {{.schema.CodeName}}ChangeHandlerFunc + update {{.schema.CodeName}}ChangeHandlerFunc + remove {{.schema.CodeName}}ChangeHandlerFunc +} + +func (n *{{.schema.ID}}LifecycleDelegate) HasCreate() bool { + return n.create != nil +} + +func (n *{{.schema.ID}}LifecycleDelegate) Create(obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) { + if n.create == nil { + return obj, nil + } + return n.create(obj) +} + +func (n *{{.schema.ID}}LifecycleDelegate) HasFinalize() bool { + return n.remove != nil +} + +func (n *{{.schema.ID}}LifecycleDelegate) Remove(obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) { + if n.remove == nil { + return obj, nil + } + return n.remove(obj) +} + +func (n *{{.schema.ID}}LifecycleDelegate) HasUpdated() bool { + return n.remove != nil +} + +func (n *{{.schema.ID}}LifecycleDelegate) Updated(obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) { + if n.update == nil { + return obj, nil + } + return n.update(obj) +} ` diff --git a/generator/k8s_client_template.go b/generator/k8s_client_template.go index 548b35c4..ee1b06f0 100644 --- a/generator/k8s_client_template.go +++ b/generator/k8s_client_template.go @@ -13,7 +13,10 @@ import ( "k8s.io/client-go/rest" ) -type contextKeyType struct{} +type ( + contextKeyType struct{} + contextClientsKeyType struct{} +) type Interface interface { RESTClient() rest.Interface @@ -22,6 +25,11 @@ type Interface interface { {{.CodeNamePlural}}Getter{{end}} } +type Clients struct { + {{range .schemas}} + {{.CodeName}} {{.CodeName}}Client{{end}} +} + type Client struct { sync.Mutex restClient rest.Interface @@ -36,13 +44,37 @@ func Factory(ctx context.Context, config rest.Config) (context.Context, controll return ctx, nil, err } - return context.WithValue(ctx, contextKeyType{}, c), c, nil + cs := NewClientsFromInterface(c) + + ctx = context.WithValue(ctx, contextKeyType{}, c) + ctx = context.WithValue(ctx, contextClientsKeyType{}, cs) + return ctx, c, nil +} + +func ClientsFrom(ctx context.Context) *Clients { + return ctx.Value(contextClientsKeyType{}).(*Clients) } func From(ctx context.Context) Interface { return ctx.Value(contextKeyType{}).(Interface) } +func NewClients(config rest.Config) (*Clients, error) { + iface, err := NewForConfig(config) + if err != nil { + return nil, err + } + return NewClientsFromInterface(iface), nil +} + +func NewClientsFromInterface(iface Interface) *Clients { + return &Clients{ + {{range .schemas}} + {{.CodeName}}: &{{.ID}}Client2{ + iface: iface.{{.CodeNamePlural}}(""), + },{{end}} + } +} func NewForConfig(config rest.Config) (Interface, error) { if config.NegotiatedSerializer == nil { diff --git a/lifecycle/object.go b/lifecycle/object.go index 10c472df..b0558cc9 100644 --- a/lifecycle/object.go +++ b/lifecycle/object.go @@ -23,6 +23,12 @@ type ObjectLifecycle interface { Updated(obj runtime.Object) (runtime.Object, error) } +type ObjectLifecycleCondition interface { + HasCreate() bool + HasFinalize() bool + HasUpdated() bool +} + type objectLifecycleAdapter struct { name string clusterScoped bool @@ -50,39 +56,41 @@ func (o *objectLifecycleAdapter) sync(key string, in interface{}) (interface{}, return nil, nil } - metadata, err := meta.Accessor(obj) - if err != nil { - return nil, err - } - - if newObj, cont, err := o.finalize(metadata, obj); err != nil || !cont { + if newObj, cont, err := o.finalize(obj); err != nil || !cont { return nil, err } else if newObj != nil { obj = newObj } - if newObj, cont, err := o.create(metadata, obj); err != nil || !cont { + if newObj, cont, err := o.create(obj); err != nil || !cont { return nil, err } else if newObj != nil { obj = newObj } - copyObj := obj.DeepCopyObject() - newObj, err := o.lifecycle.Updated(copyObj) - if newObj != nil { - return o.update(metadata.GetName(), obj, newObj) - } - return nil, err + return o.record(obj, o.lifecycle.Updated) } func (o *objectLifecycleAdapter) update(name string, orig, obj runtime.Object) (runtime.Object, error) { - if obj != nil && !reflect.DeepEqual(orig, obj) { - return o.objectClient.Update(name, obj) + if obj != nil && orig != nil && !reflect.DeepEqual(orig, obj) { + newObj, err := o.objectClient.Update(name, obj) + if newObj != nil { + return newObj, err + } + return obj, err + } + if obj == nil { + return orig, nil } return obj, nil } -func (o *objectLifecycleAdapter) finalize(metadata metav1.Object, obj runtime.Object) (runtime.Object, bool, error) { +func (o *objectLifecycleAdapter) finalize(obj runtime.Object) (runtime.Object, bool, error) { + metadata, err := meta.Accessor(obj) + if err != nil { + return obj, false, err + } + // Check finalize if metadata.GetDeletionTimestamp() == nil { return nil, true, nil @@ -92,19 +100,20 @@ func (o *objectLifecycleAdapter) finalize(metadata metav1.Object, obj runtime.Ob return nil, false, nil } - copyObj := obj.DeepCopyObject() - if newObj, err := o.lifecycle.Finalize(copyObj); err != nil { - if newObj != nil { - newObj, _ := o.update(metadata.GetName(), obj, newObj) - return newObj, false, err - } - return nil, false, err - } else if newObj != nil { - copyObj = newObj + newObj, err := o.record(obj, o.lifecycle.Finalize) + if err != nil { + return obj, false, err } - newObj, err := o.removeFinalizer(o.constructFinalizerKey(), copyObj) - return newObj, false, err + obj, err = o.removeFinalizer(o.constructFinalizerKey(), maybeDeepCopy(obj, newObj)) + return obj, false, err +} + +func maybeDeepCopy(old, newObj runtime.Object) runtime.Object { + if old == newObj { + return old.DeepCopyObject() + } + return newObj } func (o *objectLifecycleAdapter) removeFinalizer(name string, obj runtime.Object) (runtime.Object, error) { @@ -148,26 +157,63 @@ func (o *objectLifecycleAdapter) constructFinalizerKey() string { return finalizerKey + o.name } -func (o *objectLifecycleAdapter) create(metadata metav1.Object, obj runtime.Object) (runtime.Object, bool, error) { +func (o *objectLifecycleAdapter) hasFinalize() bool { + cond, ok := o.lifecycle.(ObjectLifecycleCondition) + return !ok || cond.HasFinalize() +} + +func (o *objectLifecycleAdapter) hasCreate() bool { + cond, ok := o.lifecycle.(ObjectLifecycleCondition) + return !ok || cond.HasCreate() +} + +func (o *objectLifecycleAdapter) record(obj runtime.Object, f func(runtime.Object) (runtime.Object, error)) (runtime.Object, error) { + metadata, err := meta.Accessor(obj) + if err != nil { + return obj, err + } + + obj = obj.DeepCopyObject() + if newObj, err := f(obj); err != nil { + newObj, _ = o.update(metadata.GetName(), obj, newObj) + return newObj, err + } else if newObj != nil { + newMetadata, err := meta.Accessor(newObj) + if err != nil { + // don't return error, no original error + return newObj, nil + } + if newMetadata.GetResourceVersion() == metadata.GetResourceVersion() { + return o.update(metadata.GetName(), obj, newObj) + } + return newObj, nil + } + return obj, nil +} + +func (o *objectLifecycleAdapter) create(obj runtime.Object) (runtime.Object, bool, error) { + metadata, err := meta.Accessor(obj) + if err != nil { + return obj, false, err + } + if o.isInitialized(metadata) { return nil, true, nil } - copyObj := obj.DeepCopyObject() - copyObj, err := o.addFinalizer(copyObj) - if err != nil { - return copyObj, false, err + if o.hasFinalize() { + obj, err = o.addFinalizer(obj) + if err != nil { + return obj, false, err + } } - if newObj, err := o.lifecycle.Create(copyObj); err != nil { - newObj, _ = o.update(metadata.GetName(), obj, newObj) - return newObj, false, err - } else if newObj != nil { - copyObj = newObj + if !o.hasCreate() { + return obj, true, err } - newObj, err := o.setInitialized(copyObj) - return newObj, false, err + obj, err = o.record(obj, o.lifecycle.Create) + return obj, false, err } func (o *objectLifecycleAdapter) isInitialized(metadata metav1.Object) bool { @@ -201,6 +247,12 @@ func (o *objectLifecycleAdapter) addFinalizer(obj runtime.Object) (runtime.Objec return obj, nil } + obj = obj.DeepCopyObject() + metadata, err = meta.Accessor(obj) + if err != nil { + return nil, err + } + metadata.SetFinalizers(append(metadata.GetFinalizers(), o.constructFinalizerKey())) return o.objectClient.Update(metadata.GetName(), obj) } From 1561b42e181f61996c88a2068b111794c76762da Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 19 Nov 2018 16:30:21 -0700 Subject: [PATCH 3/4] Change changeset enqueuer to interface --- pkg/changeset/changeset.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/changeset/changeset.go b/pkg/changeset/changeset.go index 13ed2df8..d6c5a673 100644 --- a/pkg/changeset/changeset.go +++ b/pkg/changeset/changeset.go @@ -17,7 +17,9 @@ type ControllerProvider interface { Generic() controller.GenericController } -type Enqueuer func(namespace, name string) +type Enqueuer interface { + Enqueue(namespace, name string) +} type Resolver func(namespace, name string, obj runtime.Object) ([]Key, error) @@ -63,7 +65,7 @@ func watch(ctx context.Context, name string, enq Enqueuer, resolve Resolver, gen for _, key := range keys { if key.Name != "" { - enq(key.Namespace, key.Name) + enq.Enqueue(key.Namespace, key.Name) } } From f88fcd722365303f00bd0dfc24a59b87647a2549 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 19 Nov 2018 16:37:47 -0700 Subject: [PATCH 4/4] Fix bug where we don't trigger handlers on second start --- controller/generic_controller.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/controller/generic_controller.go b/controller/generic_controller.go index 87453a36..c5213175 100644 --- a/controller/generic_controller.go +++ b/controller/generic_controller.go @@ -200,7 +200,7 @@ func (g *genericController) Start(ctx context.Context, threadiness int) error { if g.running { for _, h := range g.handlers { - if h.generation < g.generation { + if h.generation != g.generation { continue } for _, key := range g.informer.GetStore().ListKeys() { @@ -219,6 +219,11 @@ func (g *genericController) Start(ctx context.Context, threadiness int) error { } func (g *genericController) queueObject(obj interface{}) { + if _, ok := obj.(generationKey); ok { + g.queue.Add(obj) + return + } + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { g.queue.Add(key) @@ -267,7 +272,11 @@ func (g *genericController) processNextWorkItem() bool { logrus.Errorf("%v %v %v", g.name, key, err) } - g.queue.AddRateLimited(key) + if gk, ok := key.(generationKey); ok { + g.queue.AddRateLimited(gk.key) + } else { + g.queue.AddRateLimited(key) + } return true }