From 1fd5af67a4f62d787b053e71f16a5087ca7b1dbc Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Fri, 11 Jan 2019 16:55:37 -0700 Subject: [PATCH] Allow discovery of API clients --- build.go | 2 + pkg/objectset/desiredset.go | 22 +++-- pkg/objectset/desiredset_apply.go | 13 ++- pkg/objectset/desiredset_process.go | 122 +++++++++++++++++++++++----- pkg/objectset/template.go | 32 ++++++-- 5 files changed, 148 insertions(+), 43 deletions(-) diff --git a/build.go b/build.go index 73c1d911..b5d6c9ae 100644 --- a/build.go +++ b/build.go @@ -204,6 +204,8 @@ func (c *Config) defaults(ctx context.Context, r *Runtime, opts Options) (contex } } + r.LocalConfig = c.Config + if c.ClientGetter == nil { cg, err := proxy.NewClientGetterFromConfig(*c.Config) if err != nil { diff --git a/pkg/objectset/desiredset.go b/pkg/objectset/desiredset.go index 558d0e89..aef6bc72 100644 --- a/pkg/objectset/desiredset.go +++ b/pkg/objectset/desiredset.go @@ -1,21 +1,27 @@ package objectset import ( + "github.com/rancher/norman/objectclient" "github.com/rancher/norman/pkg/objectset/injectors" "github.com/rancher/norman/types" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" ) type DesiredSet struct { - remove bool - setID string - objs *ObjectSet - codeVersion string - clients map[schema.GroupVersionKind]Client - owner runtime.Object - injectors []injectors.ConfigInjector - errs []error + discoveredClients map[schema.GroupVersionKind]*objectclient.ObjectClient + discovery discovery.DiscoveryInterface + restConfig rest.Config + remove bool + setID string + objs *ObjectSet + codeVersion string + clients map[schema.GroupVersionKind]Client + owner runtime.Object + injectors []injectors.ConfigInjector + errs []error } func (o *DesiredSet) AddInjector(inj injectors.ConfigInjector) { diff --git a/pkg/objectset/desiredset_apply.go b/pkg/objectset/desiredset_apply.go index 8dd5c93a..5dd01ff2 100644 --- a/pkg/objectset/desiredset_apply.go +++ b/pkg/objectset/desiredset_apply.go @@ -7,15 +7,15 @@ import ( "sort" "sync" - "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/util/flowcontrol" - "github.com/pkg/errors" + errors2 "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/util/flowcontrol" ) const ( @@ -27,8 +27,7 @@ const ( ) var ( - ErrObjectSetDelay = errors.New("delaying object set apply") - hashOrder = []string{ + hashOrder = []string{ LabelID, LabelGVK, LabelName, @@ -48,7 +47,7 @@ func (o *DesiredSet) getRateLimit(inputID string) flowcontrol.RateLimiter { } else { rl = rls[inputID] if rl == nil { - rl = flowcontrol.NewTokenBucketRateLimiter(1.0/60.0, 10) + rl = flowcontrol.NewTokenBucketRateLimiter(4.0/60.0, 10) rls[inputID] = rl } } @@ -68,7 +67,7 @@ func (o *DesiredSet) Apply() error { rl := o.getRateLimit(labelSet[LabelHash]) if rl != nil && !rl.TryAccept() { - return ErrObjectSetDelay + return errors2.NewConflict(schema.GroupResource{}, o.setID, errors.New("delaying object set")) } inputID := o.inputID(labelSet[LabelHash]) diff --git a/pkg/objectset/desiredset_process.go b/pkg/objectset/desiredset_process.go index b4db1815..b4e5b8d7 100644 --- a/pkg/objectset/desiredset_process.go +++ b/pkg/objectset/desiredset_process.go @@ -4,13 +4,17 @@ import ( "fmt" "sort" - errors2 "k8s.io/apimachinery/pkg/api/errors" - "github.com/pkg/errors" + "github.com/rancher/norman/controller" + "github.com/rancher/norman/objectclient" + "github.com/rancher/norman/objectclient/dynamic" + "github.com/rancher/norman/restwatch" "github.com/rancher/norman/types" "github.com/sirupsen/logrus" + errors2 "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -21,16 +25,60 @@ var ( deletePolicy = v1.DeletePropagationBackground ) -func (o *DesiredSet) process(inputID, debugID string, set labels.Selector, gvk schema.GroupVersionKind, objs map[objectKey]runtime.Object) { +func (o *DesiredSet) getControllerAndObjectClient(debugID string, gvk schema.GroupVersionKind) (controller.GenericController, *objectclient.ObjectClient, error) { client, ok := o.clients[gvk] - if !ok { - o.err(fmt.Errorf("failed to find client for %s for %s", gvk, debugID)) + if !ok && o.discovery == nil { + return nil, nil, fmt.Errorf("failed to find client for %s for %s", gvk, debugID) + } + + if client != nil { + return client.Generic(), client.ObjectClient(), nil + } + + objectClient := o.discoveredClients[gvk] + if objectClient != nil { + return nil, objectClient, nil + } + + resources, err := o.discovery.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + return nil, nil, err + } + + for _, resource := range resources.APIResources { + if resource.Kind != gvk.Kind { + continue + } + + restConfig := o.restConfig + if restConfig.NegotiatedSerializer == nil { + restConfig.NegotiatedSerializer = dynamic.NegotiatedSerializer + } + + restClient, err := restwatch.UnversionedRESTClientFor(&restConfig) + if err != nil { + return nil, nil, err + } + + objectClient := objectclient.NewObjectClient("", restClient, &resource, gvk, &objectclient.UnstructuredObjectFactory{}) + if o.discoveredClients == nil { + o.discoveredClients = map[schema.GroupVersionKind]*objectclient.ObjectClient{} + } + o.discoveredClients[gvk] = objectClient + return nil, objectClient, nil + } + + return nil, nil, fmt.Errorf("failed to discover client for %s for %s", gvk, debugID) +} + +func (o *DesiredSet) process(inputID, debugID string, set labels.Selector, gvk schema.GroupVersionKind, objs map[objectKey]runtime.Object) { + controller, objectClient, err := o.getControllerAndObjectClient(debugID, gvk) + if err != nil { + o.err(err) return } - indexer := client.Generic().Informer().GetIndexer() - - existing, err := list(indexer, set) + existing, err := list(controller, objectClient, set) if err != nil { o.err(fmt.Errorf("failed to list %s for %s", gvk, debugID)) return @@ -45,10 +93,10 @@ func (o *DesiredSet) process(inputID, debugID string, set labels.Selector, gvk s continue } - _, err = client.ObjectClient().Create(obj) + _, err = objectClient.Create(obj) if errors2.IsAlreadyExists(err) { // Taking over an object that wasn't previously managed by us - existingObj, err := client.ObjectClient().GetNamespaced(k.namespace, k.name, v1.GetOptions{}) + existingObj, err := objectClient.GetNamespaced(k.namespace, k.name, v1.GetOptions{}) if err == nil { toUpdate = append(toUpdate, k) existing[k] = existingObj @@ -63,7 +111,7 @@ func (o *DesiredSet) process(inputID, debugID string, set labels.Selector, gvk s } for _, k := range toUpdate { - err := o.compareObjects(client.ObjectClient(), debugID, inputID, existing[k], objs[k], len(toCreate) > 0 || len(toDelete) > 0) + err := o.compareObjects(objectClient, debugID, inputID, existing[k], objs[k], len(toCreate) > 0 || len(toDelete) > 0) if err != nil { o.err(errors.Wrapf(err, "failed to update %s %s for %s", k, gvk, debugID)) continue @@ -71,7 +119,7 @@ func (o *DesiredSet) process(inputID, debugID string, set labels.Selector, gvk s } for _, k := range toDelete { - err := client.ObjectClient().DeleteNamespaced(k.namespace, k.name, &v1.DeleteOptions{ + err := objectClient.DeleteNamespaced(k.namespace, k.name, &v1.DeleteOptions{ PropagationPolicy: &deletePolicy, }) if err != nil { @@ -110,23 +158,55 @@ func sortObjectKeys(keys []objectKey) { }) } -func list(indexer cache.Indexer, selector labels.Selector) (map[objectKey]runtime.Object, error) { +func addObjectToMap(objs map[objectKey]runtime.Object, obj interface{}) error { + metadata, err := meta.Accessor(obj) + if err != nil { + return err + } + + objs[objectKey{ + namespace: metadata.GetNamespace(), + name: metadata.GetName(), + }] = obj.(runtime.Object) + + return nil +} + +func list(controller controller.GenericController, objectClient *objectclient.ObjectClient, selector labels.Selector) (map[objectKey]runtime.Object, error) { var ( errs []error objs = map[objectKey]runtime.Object{} ) - err := cache.ListAllByNamespace(indexer, "", selector, func(obj interface{}) { - metadata, err := meta.Accessor(obj) + if controller == nil { + objList, err := objectClient.List(v1.ListOptions{ + LabelSelector: selector.String(), + }) if err != nil { - errs = append(errs, err) - return + return nil, err } - objs[objectKey{ - namespace: metadata.GetNamespace(), - name: metadata.GetName(), - }] = obj.(runtime.Object) + list, ok := objList.(*unstructured.UnstructuredList) + if !ok { + return nil, fmt.Errorf("invalid list type %T", objList) + } + if err != nil { + return nil, err + } + + for _, obj := range list.Items { + if err := addObjectToMap(objs, obj); err != nil { + errs = append(errs, err) + } + } + + return objs, nil + } + + err := cache.ListAllByNamespace(controller.Informer().GetIndexer(), "", selector, func(obj interface{}) { + if err := addObjectToMap(objs, obj); err != nil { + errs = append(errs, err) + } }) if err != nil { errs = append(errs, err) diff --git a/pkg/objectset/template.go b/pkg/objectset/template.go index 1e03daa9..0d73eecf 100644 --- a/pkg/objectset/template.go +++ b/pkg/objectset/template.go @@ -5,6 +5,8 @@ import ( "github.com/rancher/norman/objectclient" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" ) type Client interface { @@ -13,24 +15,38 @@ type Client interface { } type Processor struct { - setID string - codeVersion string - clients map[schema.GroupVersionKind]Client + setID string + codeVersion string + discovery discovery.DiscoveryInterface + restConfig rest.Config + allowSlowPath string + slowClient rest.HTTPClient + clients map[schema.GroupVersionKind]Client } -func NewProcessor(setID string) Processor { - return Processor{ +func NewProcessor(setID string) *Processor { + return &Processor{ setID: setID, clients: map[schema.GroupVersionKind]Client{}, } } -func (t Processor) CodeVersion(version string) Processor { +func (t *Processor) SetID() string { + return t.setID +} + +func (t *Processor) CodeVersion(version string) *Processor { t.codeVersion = version return t } -func (t Processor) Client(clients ...Client) Processor { +func (t *Processor) AllowDiscovery(discovery discovery.DiscoveryInterface, restConfig rest.Config) *Processor { + t.discovery = discovery + t.restConfig = restConfig + return t +} + +func (t *Processor) Client(clients ...Client) *Processor { // ensure cache is enabled for _, client := range clients { client.Generic() @@ -50,6 +66,8 @@ func (t Processor) NewDesiredSet(owner runtime.Object, objs *ObjectSet) *Desired objs = &ObjectSet{} } return &DesiredSet{ + discovery: t.discovery, + restConfig: t.restConfig, remove: remove, objs: objs, setID: t.setID,