mirror of
https://github.com/rancher/norman.git
synced 2025-09-18 16:35:19 +00:00
@@ -7,7 +7,10 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rancher/norman/controller"
|
||||
"github.com/rancher/norman/objectclient"
|
||||
"k8s.io/api/core/v1"
|
||||
err2 "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
@@ -43,6 +46,10 @@ func (c Cond) GetStatus(obj runtime.Object) string {
|
||||
return getStatus(obj, string(c))
|
||||
}
|
||||
|
||||
func (c Cond) SetStatus(obj runtime.Object, status string) {
|
||||
setStatus(obj, string(c), status)
|
||||
}
|
||||
|
||||
func (c Cond) Unknown(obj runtime.Object) {
|
||||
setStatus(obj, string(c), "Unknown")
|
||||
}
|
||||
@@ -133,53 +140,97 @@ func (c Cond) Do(obj runtime.Object, f func() (runtime.Object, error)) (runtime.
|
||||
return c.do(obj, f)
|
||||
}
|
||||
|
||||
type ObjectClientGetter interface {
|
||||
ObjectClient() *objectclient.ObjectClient
|
||||
}
|
||||
|
||||
func (c Cond) Track(obj runtime.Object, client ObjectClientGetter, f func() (runtime.Object, error)) (runtime.Object, error) {
|
||||
obj = obj.DeepCopyObject()
|
||||
retObj, changed, err := c.do2(false, obj, f)
|
||||
if !changed {
|
||||
return retObj, err
|
||||
}
|
||||
|
||||
c.SetStatus(retObj, c.GetStatus(obj))
|
||||
c.LastUpdated(retObj, c.GetLastUpdated(obj))
|
||||
c.Reason(retObj, c.GetReason(obj))
|
||||
c.Message(retObj, c.GetMessage(obj))
|
||||
|
||||
if obj, ok := retObj.(metav1.Object); ok {
|
||||
updated, uerr := client.ObjectClient().Update(obj.GetName(), retObj)
|
||||
if uerr == nil {
|
||||
return updated, err
|
||||
}
|
||||
}
|
||||
|
||||
return retObj, err
|
||||
}
|
||||
|
||||
func (c Cond) do(obj runtime.Object, f func() (runtime.Object, error)) (runtime.Object, error) {
|
||||
obj, _, err := c.do2(true, obj, f)
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (c Cond) do2(setReturned bool, obj runtime.Object, f func() (runtime.Object, error)) (runtime.Object, bool, error) {
|
||||
status := c.GetStatus(obj)
|
||||
ts := c.GetLastUpdated(obj)
|
||||
reason := c.GetReason(obj)
|
||||
message := c.GetMessage(obj)
|
||||
|
||||
obj, err := c.doInternal(obj, f)
|
||||
checkObj := obj
|
||||
retObj, err := c.doInternal(setReturned, obj, f)
|
||||
if setReturned {
|
||||
checkObj = retObj
|
||||
}
|
||||
|
||||
// This is to prevent non stop flapping of states and update
|
||||
if status == c.GetStatus(obj) &&
|
||||
reason == c.GetReason(obj) {
|
||||
if message != c.GetMessage(obj) {
|
||||
replaced := temfileRegexp.ReplaceAllString(c.GetMessage(obj), "file_path_redacted")
|
||||
c.Message(obj, replaced)
|
||||
if status == c.GetStatus(checkObj) &&
|
||||
reason == c.GetReason(checkObj) {
|
||||
if message != c.GetMessage(checkObj) {
|
||||
replaced := temfileRegexp.ReplaceAllString(c.GetMessage(checkObj), "file_path_redacted")
|
||||
c.Message(checkObj, replaced)
|
||||
}
|
||||
if message == c.GetMessage(obj) {
|
||||
c.LastUpdated(obj, ts)
|
||||
if message == c.GetMessage(checkObj) {
|
||||
c.LastUpdated(checkObj, ts)
|
||||
}
|
||||
}
|
||||
|
||||
return obj, err
|
||||
changed := status != c.GetStatus(checkObj) ||
|
||||
ts != c.GetLastUpdated(checkObj) ||
|
||||
reason != c.GetReason(checkObj) ||
|
||||
message != c.GetMessage(checkObj)
|
||||
|
||||
return retObj, changed, err
|
||||
}
|
||||
|
||||
func (c Cond) doInternal(obj runtime.Object, f func() (runtime.Object, error)) (runtime.Object, error) {
|
||||
func (c Cond) doInternal(setReturned bool, obj runtime.Object, f func() (runtime.Object, error)) (runtime.Object, error) {
|
||||
if !c.IsFalse(obj) {
|
||||
c.Unknown(obj)
|
||||
}
|
||||
|
||||
setObject := obj
|
||||
newObj, err := f()
|
||||
if newObj != nil && !reflect.ValueOf(newObj).IsNil() {
|
||||
obj = newObj
|
||||
if setReturned {
|
||||
setObject = obj
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if _, ok := err.(*controller.ForgetError); ok {
|
||||
if c.GetMessage(obj) == "" {
|
||||
c.ReasonAndMessageFromError(obj, err)
|
||||
if c.GetMessage(setObject) == "" {
|
||||
c.ReasonAndMessageFromError(setObject, err)
|
||||
}
|
||||
return obj, err
|
||||
}
|
||||
c.False(obj)
|
||||
c.ReasonAndMessageFromError(obj, err)
|
||||
c.False(setObject)
|
||||
c.ReasonAndMessageFromError(setObject, err)
|
||||
return obj, err
|
||||
}
|
||||
c.True(obj)
|
||||
c.Reason(obj, "")
|
||||
c.Message(obj, "")
|
||||
c.True(setObject)
|
||||
c.Reason(setObject, "")
|
||||
c.Message(setObject, "")
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
@@ -294,3 +345,18 @@ type conditionError struct {
|
||||
func (e *conditionError) Error() string {
|
||||
return e.message
|
||||
}
|
||||
|
||||
type GenericCondition struct {
|
||||
// Type of cluster condition.
|
||||
Type string `json:"type"`
|
||||
// Status of the condition, one of True, False, Unknown.
|
||||
Status v1.ConditionStatus `json:"status"`
|
||||
// The last time this condition was updated.
|
||||
LastUpdateTime string `json:"lastUpdateTime,omitempty"`
|
||||
// Last time the condition transitioned from one status to another.
|
||||
LastTransitionTime string `json:"lastTransitionTime,omitempty"`
|
||||
// The reason for the condition's last transition.
|
||||
Reason string `json:"reason,omitempty"`
|
||||
// Human-readable message indicating details about last transition
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
@@ -200,7 +200,7 @@ func (g *genericController) Start(ctx context.Context, threadiness int) error {
|
||||
|
||||
if g.running {
|
||||
for _, h := range g.handlers {
|
||||
if h.generation < g.generation {
|
||||
if h.generation != g.generation {
|
||||
continue
|
||||
}
|
||||
for _, key := range g.informer.GetStore().ListKeys() {
|
||||
@@ -219,6 +219,11 @@ func (g *genericController) Start(ctx context.Context, threadiness int) error {
|
||||
}
|
||||
|
||||
func (g *genericController) queueObject(obj interface{}) {
|
||||
if _, ok := obj.(generationKey); ok {
|
||||
g.queue.Add(obj)
|
||||
return
|
||||
}
|
||||
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
g.queue.Add(key)
|
||||
@@ -267,7 +272,11 @@ func (g *genericController) processNextWorkItem() bool {
|
||||
logrus.Errorf("%v %v %v", g.name, key, err)
|
||||
}
|
||||
|
||||
g.queue.AddRateLimited(key)
|
||||
if gk, ok := key.(generationKey); ok {
|
||||
g.queue.AddRateLimited(gk.key)
|
||||
} else {
|
||||
g.queue.AddRateLimited(key)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
@@ -43,6 +43,8 @@ type {{.schema.CodeName}}List struct {
|
||||
|
||||
type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error)
|
||||
|
||||
type {{.schema.CodeName}}ChangeHandlerFunc func(obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error)
|
||||
|
||||
type {{.schema.CodeName}}Lister interface {
|
||||
List(namespace string, selector labels.Selector) (ret []*{{.prefix}}{{.schema.CodeName}}, err error)
|
||||
Get(namespace, name string) (*{{.prefix}}{{.schema.CodeName}}, error)
|
||||
@@ -255,4 +257,183 @@ func (s *{{.schema.ID}}Client) AddClusterScopedLifecycle(ctx context.Context, na
|
||||
sync := New{{.schema.CodeName}}LifecycleAdapter(name+"_"+clusterName, true, s, lifecycle)
|
||||
s.Controller().AddClusterScopedHandler(ctx, name, clusterName, sync)
|
||||
}
|
||||
|
||||
type {{.schema.CodeName}}Indexer func(obj *{{.prefix}}{{.schema.CodeName}}) ([]string, error)
|
||||
|
||||
type {{.schema.CodeName}}ClientCache interface {
|
||||
Get(namespace, name string) (*{{.prefix}}{{.schema.CodeName}}, error)
|
||||
List(namespace string, selector labels.Selector) ([]*{{.prefix}}{{.schema.CodeName}}, error)
|
||||
|
||||
Index(name string, indexer {{.schema.CodeName}}Indexer)
|
||||
GetIndexed(name, key string) ([]*{{.prefix}}{{.schema.CodeName}}, error)
|
||||
}
|
||||
|
||||
type {{.schema.CodeName}}Client interface {
|
||||
Create(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error)
|
||||
Get(namespace, name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error)
|
||||
Update(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error)
|
||||
Delete(namespace, name string, options *metav1.DeleteOptions) error
|
||||
List(namespace string, opts metav1.ListOptions) (*{{.schema.CodeName}}List, error)
|
||||
Watch(opts metav1.ListOptions) (watch.Interface, error)
|
||||
|
||||
Cache() {{.schema.CodeName}}ClientCache
|
||||
|
||||
OnCreate(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc)
|
||||
OnChange(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc)
|
||||
OnRemove(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc)
|
||||
Enqueue(namespace, name string)
|
||||
|
||||
Generic() controller.GenericController
|
||||
Interface() {{.schema.CodeName}}Interface
|
||||
}
|
||||
|
||||
type {{.schema.ID}}ClientCache struct {
|
||||
client *{{.schema.ID}}Client2
|
||||
}
|
||||
|
||||
type {{.schema.ID}}Client2 struct {
|
||||
iface {{.schema.CodeName}}Interface
|
||||
controller {{.schema.CodeName}}Controller
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Interface() {{.schema.CodeName}}Interface {
|
||||
return n.iface
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Generic() controller.GenericController {
|
||||
return n.iface.Controller().Generic()
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Enqueue(namespace, name string) {
|
||||
n.iface.Controller().Enqueue(namespace, name)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Create(obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) {
|
||||
return n.iface.Create(obj)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Get(namespace, name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) {
|
||||
return n.iface.GetNamespaced(namespace, name, opts)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Update(obj *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) {
|
||||
return n.iface.Update(obj)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Delete(namespace, name string, options *metav1.DeleteOptions) error {
|
||||
return n.iface.DeleteNamespaced(namespace, name, options)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) List(namespace string, opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) {
|
||||
return n.iface.List(opts)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Watch(opts metav1.ListOptions) (watch.Interface, error) {
|
||||
return n.iface.Watch(opts)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}ClientCache) Get(namespace, name string) (*{{.prefix}}{{.schema.CodeName}}, error) {
|
||||
return n.client.controller.Lister().Get(namespace, name)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}ClientCache) List(namespace string, selector labels.Selector) ([]*{{.prefix}}{{.schema.CodeName}}, error) {
|
||||
return n.client.controller.Lister().List(namespace, selector)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) Cache() {{.schema.CodeName}}ClientCache {
|
||||
n.loadController()
|
||||
return &{{.schema.ID}}ClientCache{
|
||||
client: n,
|
||||
}
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) OnCreate(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) {
|
||||
n.loadController()
|
||||
n.iface.AddLifecycle(ctx, name, &{{.schema.ID}}LifecycleDelegate{create: sync})
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) OnChange(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) {
|
||||
n.loadController()
|
||||
n.iface.AddLifecycle(ctx, name, &{{.schema.ID}}LifecycleDelegate{update: sync})
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) OnRemove(ctx context.Context, name string, sync {{.schema.CodeName}}ChangeHandlerFunc) {
|
||||
n.loadController()
|
||||
n.iface.AddLifecycle(ctx, name, &{{.schema.ID}}LifecycleDelegate{remove: sync})
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}ClientCache) Index(name string, indexer {{.schema.CodeName}}Indexer) {
|
||||
err := n.client.controller.Informer().GetIndexer().AddIndexers(map[string]cache.IndexFunc{
|
||||
name: func(obj interface{}) ([]string, error) {
|
||||
if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok {
|
||||
return indexer(v)
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}ClientCache) GetIndexed(name, key string) ([]*{{.prefix}}{{.schema.CodeName}}, error) {
|
||||
var result []*{{.prefix}}{{.schema.CodeName}}
|
||||
objs, err := n.client.controller.Informer().GetIndexer().ByIndex(name, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, obj := range objs {
|
||||
if v, ok := obj.(*{{.prefix}}{{.schema.CodeName}}); ok {
|
||||
result = append(result, v)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}Client2) loadController() {
|
||||
if n.controller == nil {
|
||||
n.controller = n.iface.Controller()
|
||||
}
|
||||
}
|
||||
|
||||
type {{.schema.ID}}LifecycleDelegate struct {
|
||||
create {{.schema.CodeName}}ChangeHandlerFunc
|
||||
update {{.schema.CodeName}}ChangeHandlerFunc
|
||||
remove {{.schema.CodeName}}ChangeHandlerFunc
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}LifecycleDelegate) HasCreate() bool {
|
||||
return n.create != nil
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}LifecycleDelegate) Create(obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) {
|
||||
if n.create == nil {
|
||||
return obj, nil
|
||||
}
|
||||
return n.create(obj)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}LifecycleDelegate) HasFinalize() bool {
|
||||
return n.remove != nil
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}LifecycleDelegate) Remove(obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) {
|
||||
if n.remove == nil {
|
||||
return obj, nil
|
||||
}
|
||||
return n.remove(obj)
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}LifecycleDelegate) HasUpdated() bool {
|
||||
return n.remove != nil
|
||||
}
|
||||
|
||||
func (n *{{.schema.ID}}LifecycleDelegate) Updated(obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) {
|
||||
if n.update == nil {
|
||||
return obj, nil
|
||||
}
|
||||
return n.update(obj)
|
||||
}
|
||||
`
|
||||
|
@@ -13,7 +13,10 @@ import (
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
type contextKeyType struct{}
|
||||
type (
|
||||
contextKeyType struct{}
|
||||
contextClientsKeyType struct{}
|
||||
)
|
||||
|
||||
type Interface interface {
|
||||
RESTClient() rest.Interface
|
||||
@@ -22,6 +25,11 @@ type Interface interface {
|
||||
{{.CodeNamePlural}}Getter{{end}}
|
||||
}
|
||||
|
||||
type Clients struct {
|
||||
{{range .schemas}}
|
||||
{{.CodeName}} {{.CodeName}}Client{{end}}
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
sync.Mutex
|
||||
restClient rest.Interface
|
||||
@@ -36,13 +44,37 @@ func Factory(ctx context.Context, config rest.Config) (context.Context, controll
|
||||
return ctx, nil, err
|
||||
}
|
||||
|
||||
return context.WithValue(ctx, contextKeyType{}, c), c, nil
|
||||
cs := NewClientsFromInterface(c)
|
||||
|
||||
ctx = context.WithValue(ctx, contextKeyType{}, c)
|
||||
ctx = context.WithValue(ctx, contextClientsKeyType{}, cs)
|
||||
return ctx, c, nil
|
||||
}
|
||||
|
||||
func ClientsFrom(ctx context.Context) *Clients {
|
||||
return ctx.Value(contextClientsKeyType{}).(*Clients)
|
||||
}
|
||||
|
||||
func From(ctx context.Context) Interface {
|
||||
return ctx.Value(contextKeyType{}).(Interface)
|
||||
}
|
||||
|
||||
func NewClients(config rest.Config) (*Clients, error) {
|
||||
iface, err := NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewClientsFromInterface(iface), nil
|
||||
}
|
||||
|
||||
func NewClientsFromInterface(iface Interface) *Clients {
|
||||
return &Clients{
|
||||
{{range .schemas}}
|
||||
{{.CodeName}}: &{{.ID}}Client2{
|
||||
iface: iface.{{.CodeNamePlural}}(""),
|
||||
},{{end}}
|
||||
}
|
||||
}
|
||||
|
||||
func NewForConfig(config rest.Config) (Interface, error) {
|
||||
if config.NegotiatedSerializer == nil {
|
||||
|
@@ -23,6 +23,12 @@ type ObjectLifecycle interface {
|
||||
Updated(obj runtime.Object) (runtime.Object, error)
|
||||
}
|
||||
|
||||
type ObjectLifecycleCondition interface {
|
||||
HasCreate() bool
|
||||
HasFinalize() bool
|
||||
HasUpdated() bool
|
||||
}
|
||||
|
||||
type objectLifecycleAdapter struct {
|
||||
name string
|
||||
clusterScoped bool
|
||||
@@ -50,39 +56,41 @@ func (o *objectLifecycleAdapter) sync(key string, in interface{}) (interface{},
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if newObj, cont, err := o.finalize(metadata, obj); err != nil || !cont {
|
||||
if newObj, cont, err := o.finalize(obj); err != nil || !cont {
|
||||
return nil, err
|
||||
} else if newObj != nil {
|
||||
obj = newObj
|
||||
}
|
||||
|
||||
if newObj, cont, err := o.create(metadata, obj); err != nil || !cont {
|
||||
if newObj, cont, err := o.create(obj); err != nil || !cont {
|
||||
return nil, err
|
||||
} else if newObj != nil {
|
||||
obj = newObj
|
||||
}
|
||||
|
||||
copyObj := obj.DeepCopyObject()
|
||||
newObj, err := o.lifecycle.Updated(copyObj)
|
||||
if newObj != nil {
|
||||
return o.update(metadata.GetName(), obj, newObj)
|
||||
}
|
||||
return nil, err
|
||||
return o.record(obj, o.lifecycle.Updated)
|
||||
}
|
||||
|
||||
func (o *objectLifecycleAdapter) update(name string, orig, obj runtime.Object) (runtime.Object, error) {
|
||||
if obj != nil && !reflect.DeepEqual(orig, obj) {
|
||||
return o.objectClient.Update(name, obj)
|
||||
if obj != nil && orig != nil && !reflect.DeepEqual(orig, obj) {
|
||||
newObj, err := o.objectClient.Update(name, obj)
|
||||
if newObj != nil {
|
||||
return newObj, err
|
||||
}
|
||||
return obj, err
|
||||
}
|
||||
if obj == nil {
|
||||
return orig, nil
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (o *objectLifecycleAdapter) finalize(metadata metav1.Object, obj runtime.Object) (runtime.Object, bool, error) {
|
||||
func (o *objectLifecycleAdapter) finalize(obj runtime.Object) (runtime.Object, bool, error) {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return obj, false, err
|
||||
}
|
||||
|
||||
// Check finalize
|
||||
if metadata.GetDeletionTimestamp() == nil {
|
||||
return nil, true, nil
|
||||
@@ -92,19 +100,20 @@ func (o *objectLifecycleAdapter) finalize(metadata metav1.Object, obj runtime.Ob
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
copyObj := obj.DeepCopyObject()
|
||||
if newObj, err := o.lifecycle.Finalize(copyObj); err != nil {
|
||||
if newObj != nil {
|
||||
newObj, _ := o.update(metadata.GetName(), obj, newObj)
|
||||
return newObj, false, err
|
||||
}
|
||||
return nil, false, err
|
||||
} else if newObj != nil {
|
||||
copyObj = newObj
|
||||
newObj, err := o.record(obj, o.lifecycle.Finalize)
|
||||
if err != nil {
|
||||
return obj, false, err
|
||||
}
|
||||
|
||||
newObj, err := o.removeFinalizer(o.constructFinalizerKey(), copyObj)
|
||||
return newObj, false, err
|
||||
obj, err = o.removeFinalizer(o.constructFinalizerKey(), maybeDeepCopy(obj, newObj))
|
||||
return obj, false, err
|
||||
}
|
||||
|
||||
func maybeDeepCopy(old, newObj runtime.Object) runtime.Object {
|
||||
if old == newObj {
|
||||
return old.DeepCopyObject()
|
||||
}
|
||||
return newObj
|
||||
}
|
||||
|
||||
func (o *objectLifecycleAdapter) removeFinalizer(name string, obj runtime.Object) (runtime.Object, error) {
|
||||
@@ -148,26 +157,63 @@ func (o *objectLifecycleAdapter) constructFinalizerKey() string {
|
||||
return finalizerKey + o.name
|
||||
}
|
||||
|
||||
func (o *objectLifecycleAdapter) create(metadata metav1.Object, obj runtime.Object) (runtime.Object, bool, error) {
|
||||
func (o *objectLifecycleAdapter) hasFinalize() bool {
|
||||
cond, ok := o.lifecycle.(ObjectLifecycleCondition)
|
||||
return !ok || cond.HasFinalize()
|
||||
}
|
||||
|
||||
func (o *objectLifecycleAdapter) hasCreate() bool {
|
||||
cond, ok := o.lifecycle.(ObjectLifecycleCondition)
|
||||
return !ok || cond.HasCreate()
|
||||
}
|
||||
|
||||
func (o *objectLifecycleAdapter) record(obj runtime.Object, f func(runtime.Object) (runtime.Object, error)) (runtime.Object, error) {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return obj, err
|
||||
}
|
||||
|
||||
obj = obj.DeepCopyObject()
|
||||
if newObj, err := f(obj); err != nil {
|
||||
newObj, _ = o.update(metadata.GetName(), obj, newObj)
|
||||
return newObj, err
|
||||
} else if newObj != nil {
|
||||
newMetadata, err := meta.Accessor(newObj)
|
||||
if err != nil {
|
||||
// don't return error, no original error
|
||||
return newObj, nil
|
||||
}
|
||||
if newMetadata.GetResourceVersion() == metadata.GetResourceVersion() {
|
||||
return o.update(metadata.GetName(), obj, newObj)
|
||||
}
|
||||
return newObj, nil
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (o *objectLifecycleAdapter) create(obj runtime.Object) (runtime.Object, bool, error) {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return obj, false, err
|
||||
}
|
||||
|
||||
if o.isInitialized(metadata) {
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
copyObj := obj.DeepCopyObject()
|
||||
copyObj, err := o.addFinalizer(copyObj)
|
||||
if err != nil {
|
||||
return copyObj, false, err
|
||||
if o.hasFinalize() {
|
||||
obj, err = o.addFinalizer(obj)
|
||||
if err != nil {
|
||||
return obj, false, err
|
||||
}
|
||||
}
|
||||
|
||||
if newObj, err := o.lifecycle.Create(copyObj); err != nil {
|
||||
newObj, _ = o.update(metadata.GetName(), obj, newObj)
|
||||
return newObj, false, err
|
||||
} else if newObj != nil {
|
||||
copyObj = newObj
|
||||
if !o.hasCreate() {
|
||||
return obj, true, err
|
||||
}
|
||||
|
||||
newObj, err := o.setInitialized(copyObj)
|
||||
return newObj, false, err
|
||||
obj, err = o.record(obj, o.lifecycle.Create)
|
||||
return obj, false, err
|
||||
}
|
||||
|
||||
func (o *objectLifecycleAdapter) isInitialized(metadata metav1.Object) bool {
|
||||
@@ -201,6 +247,12 @@ func (o *objectLifecycleAdapter) addFinalizer(obj runtime.Object) (runtime.Objec
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
obj = obj.DeepCopyObject()
|
||||
metadata, err = meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metadata.SetFinalizers(append(metadata.GetFinalizers(), o.constructFinalizerKey()))
|
||||
return o.objectClient.Update(metadata.GetName(), obj)
|
||||
}
|
||||
|
@@ -17,7 +17,9 @@ type ControllerProvider interface {
|
||||
Generic() controller.GenericController
|
||||
}
|
||||
|
||||
type Enqueuer func(namespace, name string)
|
||||
type Enqueuer interface {
|
||||
Enqueue(namespace, name string)
|
||||
}
|
||||
|
||||
type Resolver func(namespace, name string, obj runtime.Object) ([]Key, error)
|
||||
|
||||
@@ -63,7 +65,7 @@ func watch(ctx context.Context, name string, enq Enqueuer, resolve Resolver, gen
|
||||
|
||||
for _, key := range keys {
|
||||
if key.Name != "" {
|
||||
enq(key.Namespace, key.Name)
|
||||
enq.Enqueue(key.Namespace, key.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user