diff --git a/controller/generic_controller.go b/controller/generic_controller.go index 571dd949..1d74c104 100644 --- a/controller/generic_controller.go +++ b/controller/generic_controller.go @@ -2,321 +2,69 @@ package controller import ( "context" - "fmt" - "os" - "reflect" "strings" - "sync" "time" + "github.com/rancher/lasso/pkg/controller" + errors2 "github.com/pkg/errors" "github.com/rancher/norman/metrics" - "github.com/rancher/norman/objectclient" - "github.com/rancher/norman/types" "github.com/sirupsen/logrus" - "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" ) -const MetricsQueueEnv = "NORMAN_QUEUE_METRICS" -const MetricsReflectorEnv = "NORMAN_REFLECTOR_METRICS" - -var ( - resyncPeriod = 2 * time.Hour -) - -// Override the metrics providers -func init() { - if os.Getenv(MetricsQueueEnv) != "true" { - DisableControllerWorkqueuMetrics() - } - if os.Getenv(MetricsReflectorEnv) != "true" { - DisableControllerReflectorMetrics() - } -} - type HandlerFunc func(key string, obj interface{}) (interface{}, error) type GenericController interface { - SetThreadinessOverride(count int) Informer() cache.SharedIndexInformer AddHandler(ctx context.Context, name string, handler HandlerFunc) - HandlerCount() int Enqueue(namespace, name string) EnqueueAfter(namespace, name string, after time.Duration) - Sync(ctx context.Context) error - Start(ctx context.Context, threadiness int) error -} - -type Backend interface { - List(opts metav1.ListOptions) (runtime.Object, error) - Watch(opts metav1.ListOptions) (watch.Interface, error) - ObjectFactory() objectclient.ObjectFactory -} - -type handlerDef struct { - name string - generation int - handler HandlerFunc -} - -type generationKey struct { - generation int - key string } type genericController struct { - sync.Mutex - threadinessOverride int - generation int - informer cache.SharedIndexInformer - handlers []*handlerDef - preStart []string - queue workqueue.RateLimitingInterface - name string - running bool - synced bool + controller controller.SharedController + informer cache.SharedIndexInformer + name string } -func NewGenericController(name string, genericClient Backend) GenericController { - informer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: genericClient.List, - WatchFunc: genericClient.Watch, - }, - genericClient.ObjectFactory().Object(), resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - +func NewGenericController(name string, controller controller.SharedController) GenericController { return &genericController{ - informer: informer, - name: name, + controller: controller, + informer: controller.Informer(), + name: name, } } -func (g *genericController) SetThreadinessOverride(count int) { - g.threadinessOverride = count -} - -func (g *genericController) HandlerCount() int { - return len(g.handlers) -} - func (g *genericController) Informer() cache.SharedIndexInformer { return g.informer } func (g *genericController) Enqueue(namespace, name string) { - key := name - if namespace != "" { - key = namespace + "/" + name - } - if g.queue == nil { - g.preStart = append(g.preStart, key) - } else { - g.queue.AddRateLimited(key) - } + g.controller.Enqueue(namespace, name) } func (g *genericController) EnqueueAfter(namespace, name string, after time.Duration) { - key := name - if namespace != "" { - key = namespace + "/" + name - } - if g.queue != nil { - g.queue.AddAfter(key, after) - } + g.controller.EnqueueAfter(namespace, name, after) } func (g *genericController) AddHandler(ctx context.Context, name string, handler HandlerFunc) { - t := getHandlerTransaction(ctx) - if t == nil { - g.addHandler(ctx, name, handler) - return - } - - go func() { - if t.shouldContinue() { - g.addHandler(ctx, name, handler) + g.controller.RegisterHandler(ctx, name, controller.SharedControllerHandlerFunc(func(key string, obj runtime.Object) (runtime.Object, error) { + logrus.Tracef("%s calling handler %s %s", g.name, name, key) + metrics.IncTotalHandlerExecution(g.name, name) + result, err := handler(key, obj) + runtimeObject, _ := result.(runtime.Object) + if err != nil && !ignoreError(err, false) { + metrics.IncTotalHandlerFailure(g.name, name, key) } - }() -} - -func (g *genericController) addHandler(ctx context.Context, name string, handler HandlerFunc) { - g.Lock() - defer g.Unlock() - - g.generation++ - h := &handlerDef{ - name: name, - generation: g.generation, - handler: handler, - } - - go func(gen int) { - <-ctx.Done() - g.Lock() - defer g.Unlock() - var newHandlers []*handlerDef - for _, handler := range g.handlers { - if handler.generation == gen { - continue - } - newHandlers = append(newHandlers, handler) - } - g.handlers = newHandlers - }(h.generation) - - g.handlers = append(g.handlers, h) - - if g.synced { - for _, key := range g.informer.GetStore().ListKeys() { - g.queue.Add(key) - } - } - -} - -func (g *genericController) Sync(ctx context.Context) error { - g.Lock() - defer g.Unlock() - - return g.sync(ctx) -} - -func (g *genericController) sync(ctx context.Context) (retErr error) { - if g.synced { - return nil - } - - if g.queue == nil { - rl := workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 1000*time.Second), - // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ) - - g.queue = workqueue.NewNamedRateLimitingQueue(rl, g.name) - for _, key := range g.preStart { - g.queue.Add(key) - } - g.preStart = nil - - defer func() { - if retErr != nil { - g.queue.ShutDown() - } - }() - } - - defer utilruntime.HandleCrash() - - g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: g.queueObject, - UpdateFunc: func(_, obj interface{}) { - g.queueObject(obj) - }, - DeleteFunc: g.queueObject, - }) - - logrus.Tracef("Syncing %s Controller", g.name) - - go g.informer.Run(ctx.Done()) - - if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) { - return fmt.Errorf("failed to sync controller %s", g.name) - } - logrus.Tracef("Syncing %s Controller Done", g.name) - - g.synced = true - return nil -} - -func (g *genericController) Start(ctx context.Context, threadiness int) error { - g.Lock() - defer g.Unlock() - - if err := g.sync(ctx); err != nil { - return err - } - - if !g.running { - if g.threadinessOverride > 0 { - threadiness = g.threadinessOverride - } - go g.run(ctx, threadiness) - g.running = true - } - - return nil -} - -func (g *genericController) queueObject(obj interface{}) { - if _, ok := obj.(generationKey); ok { - g.queue.Add(obj) - return - } - - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err == nil { - g.queue.Add(key) - } -} - -func (g *genericController) run(ctx context.Context, threadiness int) { - defer utilruntime.HandleCrash() - defer g.queue.ShutDown() - - for i := 0; i < threadiness; i++ { - go wait.Until(g.runWorker, time.Second, ctx.Done()) - } - - <-ctx.Done() - logrus.Infof("Shutting down %s controller", g.name) -} - -func (g *genericController) runWorker() { - for g.processNextWorkItem() { - } -} - -func (g *genericController) processNextWorkItem() bool { - key, quit := g.queue.Get() - if quit { - return false - } - defer g.queue.Done(key) - - // do your work on the key. This method will contains your "do stuff" logic - err := g.syncHandler(key) - checkErr := err - if handlerErr, ok := checkErr.(*handlerError); ok { - checkErr = handlerErr.err - } - if _, ok := checkErr.(*ForgetError); err == nil || ok { - if ok { + if _, ok := err.(*ForgetError); ok { logrus.Tracef("%v %v completed with dropped err: %v", g.name, key, err) + return runtimeObject, controller.ErrIgnore } - g.queue.Forget(key) - return true - } - - if err = filterConflictsError(err); err != nil { - logrus.Errorf("%v %v %v", g.name, key, err) - } - - if gk, ok := key.(generationKey); ok { - g.queue.AddRateLimited(gk.key) - } else { - g.queue.AddRateLimited(key) - } - - return true + return runtimeObject, err + })) } func ignoreError(err error, checkString bool) bool { @@ -324,6 +72,9 @@ func ignoreError(err error, checkString bool) bool { if errors.IsConflict(err) { return true } + if err == controller.ErrIgnore { + return true + } if _, ok := err.(*ForgetError); ok { return true } @@ -332,83 +83,3 @@ func ignoreError(err error, checkString bool) bool { } return false } - -func filterConflictsError(err error) error { - if ignoreError(err, false) { - return nil - } - - if errs, ok := errors2.Cause(err).(*types.MultiErrors); ok { - var newErrors []error - for _, newError := range errs.Errors { - if !ignoreError(newError, true) { - newErrors = append(newErrors, newError) - } - } - return types.NewErrors(newErrors...) - } - - return err -} - -func (g *genericController) syncHandler(key interface{}) (err error) { - defer utilruntime.RecoverFromPanic(&err) - - generation := -1 - var s string - var obj interface{} - - switch v := key.(type) { - case string: - s = v - case generationKey: - generation = v.generation - s = v.key - default: - return nil - } - - obj, exists, err := g.informer.GetStore().GetByKey(s) - if err != nil { - return err - } else if !exists { - obj = nil - } - - var errs []error - for _, handler := range g.handlers { - if generation > -1 && handler.generation != generation { - continue - } - - logrus.Tracef("%s calling handler %s %s", g.name, handler.name, s) - metrics.IncTotalHandlerExecution(g.name, handler.name) - var newObj interface{} - if newObj, err = handler.handler(s, obj); err != nil { - if !ignoreError(err, false) { - metrics.IncTotalHandlerFailure(g.name, handler.name, s) - } - errs = append(errs, &handlerError{ - name: handler.name, - err: err, - }) - } else if newObj != nil && !reflect.ValueOf(newObj).IsNil() { - obj = newObj - } - } - err = types.NewErrors(errs...) - return -} - -type handlerError struct { - name string - err error -} - -func (h *handlerError) Error() string { - return fmt.Sprintf("[%s] failed with : %v", h.name, h.err) -} - -func (h *handlerError) Cause() error { - return h.err -} diff --git a/controller/noop_metrics.go b/controller/noop_metrics.go deleted file mode 100644 index 1ac89de7..00000000 --- a/controller/noop_metrics.go +++ /dev/null @@ -1,107 +0,0 @@ -package controller - -import ( - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" -) - -type noopMetric struct{} - -func (noopMetric) Inc() {} -func (noopMetric) Dec() {} -func (noopMetric) Observe(float64) {} -func (noopMetric) Set(float64) {} - -type noopWorkqueueMetricsProvider struct{} - -func (noopWorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric { - return noopMetric{} -} - -func (noopWorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric { - return noopMetric{} -} - -type noopCacheMetricsProvider struct{} - -func (noopCacheMetricsProvider) NewListsMetric(name string) cache.CounterMetric { return noopMetric{} } -func (noopCacheMetricsProvider) NewListDurationMetric(name string) cache.SummaryMetric { - return noopMetric{} -} -func (noopCacheMetricsProvider) NewItemsInListMetric(name string) cache.SummaryMetric { - return noopMetric{} -} -func (noopCacheMetricsProvider) NewWatchesMetric(name string) cache.CounterMetric { return noopMetric{} } -func (noopCacheMetricsProvider) NewShortWatchesMetric(name string) cache.CounterMetric { - return noopMetric{} -} -func (noopCacheMetricsProvider) NewWatchDurationMetric(name string) cache.SummaryMetric { - return noopMetric{} -} -func (noopCacheMetricsProvider) NewItemsInWatchMetric(name string) cache.SummaryMetric { - return noopMetric{} -} -func (noopCacheMetricsProvider) NewLastResourceVersionMetric(name string) cache.GaugeMetric { - return noopMetric{} -} - -func DisableAllControllerMetrics() { - DisableControllerReflectorMetrics() - DisableControllerWorkqueuMetrics() -} - -func DisableControllerWorkqueuMetrics() { - workqueue.SetProvider(noopWorkqueueMetricsProvider{}) -} - -func DisableControllerReflectorMetrics() { - cache.SetReflectorMetricsProvider(noopCacheMetricsProvider{}) -} diff --git a/controller/starter.go b/controller/starter.go deleted file mode 100644 index 55519767..00000000 --- a/controller/starter.go +++ /dev/null @@ -1,40 +0,0 @@ -package controller - -import ( - "context" - - "golang.org/x/sync/errgroup" -) - -type Starter interface { - Sync(ctx context.Context) error - Start(ctx context.Context, threadiness int) error -} - -func SyncThenStart(ctx context.Context, threadiness int, starters ...Starter) error { - if err := Sync(ctx, starters...); err != nil { - return err - } - return Start(ctx, threadiness, starters...) -} - -func Sync(ctx context.Context, starters ...Starter) error { - eg, _ := errgroup.WithContext(ctx) - for _, starter := range starters { - func(starter Starter) { - eg.Go(func() error { - return starter.Sync(ctx) - }) - }(starter) - } - return eg.Wait() -} - -func Start(ctx context.Context, threadiness int, starters ...Starter) error { - for _, starter := range starters { - if err := starter.Start(ctx, threadiness); err != nil { - return err - } - } - return nil -} diff --git a/controller/transaction.go b/controller/transaction.go deleted file mode 100644 index aa206a71..00000000 --- a/controller/transaction.go +++ /dev/null @@ -1,47 +0,0 @@ -package controller - -import ( - "context" -) - -type hTransactionKey struct{} - -type HandlerTransaction struct { - context.Context - parent context.Context - done chan struct{} - result bool -} - -func (h *HandlerTransaction) shouldContinue() bool { - select { - case <-h.parent.Done(): - return false - case <-h.done: - return h.result - } -} - -func (h *HandlerTransaction) Commit() { - h.result = true - close(h.done) -} - -func (h *HandlerTransaction) Rollback() { - close(h.done) -} - -func NewHandlerTransaction(ctx context.Context) *HandlerTransaction { - ht := &HandlerTransaction{ - parent: ctx, - done: make(chan struct{}), - } - ctx = context.WithValue(ctx, hTransactionKey{}, ht) - ht.Context = ctx - return ht -} - -func getHandlerTransaction(ctx context.Context) *HandlerTransaction { - v, _ := ctx.Value(hTransactionKey{}).(*HandlerTransaction) - return v -} diff --git a/generator/controller_template.go b/generator/controller_template.go index c7213c0f..b00782ef 100644 --- a/generator/controller_template.go +++ b/generator/controller_template.go @@ -28,7 +28,7 @@ var ( } {{.schema.CodeName}}Resource = metav1.APIResource{ Name: "{{.schema.PluralName | toLower}}", - SingularName: "{{.schema.ID | toLower}}", + SingularName: "{{.schema.CodeName | toLower}}", {{- if eq .schema.Scope "namespace" }} Namespaced: true, {{ else }} @@ -55,11 +55,13 @@ func New{{.schema.CodeName}}(namespace, name string, obj {{.prefix}}{{.schema.Co return &obj } +{{ if eq .prefix "" }} type {{.schema.CodeName}}List struct { metav1.TypeMeta %BACK%json:",inline"%BACK% metav1.ListMeta %BACK%json:"metadata,omitempty"%BACK% Items []{{.prefix}}{{.schema.CodeName}} %BACK%json:"items"%BACK% } +{{- end }} type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) (runtime.Object, error) @@ -80,8 +82,6 @@ type {{.schema.CodeName}}Controller interface { AddClusterScopedFeatureHandler(ctx context.Context, enabled func() bool, name, clusterName string, handler {{.schema.CodeName}}HandlerFunc) Enqueue(namespace, name string) EnqueueAfter(namespace, name string, after time.Duration) - Sync(ctx context.Context) error - Start(ctx context.Context, threadiness int) error } type {{.schema.CodeName}}Interface interface { @@ -92,8 +92,8 @@ type {{.schema.CodeName}}Interface interface { Update(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) Delete(name string, options *metav1.DeleteOptions) error DeleteNamespaced(namespace, name string, options *metav1.DeleteOptions) error - List(opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) - ListNamespaced(namespace string, opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) + List(opts metav1.ListOptions) (*{{.prefix}}{{.schema.CodeName}}List, error) + ListNamespaced(namespace string, opts metav1.ListOptions) (*{{.prefix}}{{.schema.CodeName}}List, error) Watch(opts metav1.ListOptions) (watch.Interface, error) DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error Controller() {{.schema.CodeName}}Controller @@ -132,7 +132,7 @@ func (l *{{.schema.ID}}Lister) Get(namespace, name string) (*{{.prefix}}{{.schem if !exists { return nil, errors.NewNotFound(schema.GroupResource{ Group: {{.schema.CodeName}}GroupVersionKind.Group, - Resource: "{{.schema.ID}}", + Resource: {{.schema.CodeName}}GroupVersionResource.Resource, }, key) } return obj.(*{{.prefix}}{{.schema.CodeName}}), nil @@ -213,29 +213,16 @@ func (c {{.schema.ID}}Factory) Object() runtime.Object { } func (c {{.schema.ID}}Factory) List() runtime.Object { - return &{{.schema.CodeName}}List{} + return &{{.prefix}}{{.schema.CodeName}}List{} } func (s *{{.schema.ID}}Client) Controller() {{.schema.CodeName}}Controller { - s.client.Lock() - defer s.client.Unlock() - - c, ok := s.client.{{.schema.ID}}Controllers[s.ns] - if ok { - return c - } - genericController := controller.NewGenericController({{.schema.CodeName}}GroupVersionKind.Kind+"Controller", - s.objectClient) + s.client.controllerFactory.ForResourceKind({{.schema.CodeName}}GroupVersionResource, {{.schema.CodeName}}GroupVersionKind.Kind, {{.schema | namespaced}})) - c = &{{.schema.ID}}Controller{ + return &{{.schema.ID}}Controller{ GenericController: genericController, } - - s.client.{{.schema.ID}}Controllers[s.ns] = c - s.client.starters = append(s.client.starters, c) - - return c } type {{.schema.ID}}Client struct { @@ -269,6 +256,11 @@ func (s *{{.schema.ID}}Client) Update(o *{{.prefix}}{{.schema.CodeName}}) (*{{.p return obj.(*{{.prefix}}{{.schema.CodeName}}), err } +func (s *{{.schema.ID}}Client) UpdateStatus(o *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) { + obj, err := s.objectClient.UpdateStatus(o.Name, o) + return obj.(*{{.prefix}}{{.schema.CodeName}}), err +} + func (s *{{.schema.ID}}Client) Delete(name string, options *metav1.DeleteOptions) error { return s.objectClient.Delete(name, options) } @@ -277,14 +269,14 @@ func (s *{{.schema.ID}}Client) DeleteNamespaced(namespace, name string, options return s.objectClient.DeleteNamespaced(namespace, name, options) } -func (s *{{.schema.ID}}Client) List(opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) { +func (s *{{.schema.ID}}Client) List(opts metav1.ListOptions) (*{{.prefix}}{{.schema.CodeName}}List, error) { obj, err := s.objectClient.List(opts) - return obj.(*{{.schema.CodeName}}List), err + return obj.(*{{.prefix}}{{.schema.CodeName}}List), err } -func (s *{{.schema.ID}}Client) ListNamespaced(namespace string, opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) { +func (s *{{.schema.ID}}Client) ListNamespaced(namespace string, opts metav1.ListOptions) (*{{.prefix}}{{.schema.CodeName}}List, error) { obj, err := s.objectClient.ListNamespaced(namespace, opts) - return obj.(*{{.schema.CodeName}}List), err + return obj.(*{{.prefix}}{{.schema.CodeName}}List), err } func (s *{{.schema.ID}}Client) Watch(opts metav1.ListOptions) (watch.Interface, error) { diff --git a/generator/default.go b/generator/default.go deleted file mode 100644 index e13c43d5..00000000 --- a/generator/default.go +++ /dev/null @@ -1,66 +0,0 @@ -package generator - -import ( - "fmt" - "path" - "strings" - - "github.com/rancher/norman/types" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -var ( - baseCattle = "client" - baseK8s = "apis" -) - -func DefaultGenerate(schemas *types.Schemas, pkgPath string, publicAPI bool, privateTypes map[string]bool) error { - version := getVersion(schemas) - group := strings.Split(version.Group, ".")[0] - - cattleOutputPackage := path.Join(pkgPath, baseCattle, group, version.Version) - if !publicAPI { - cattleOutputPackage = "" - } - k8sOutputPackage := path.Join(pkgPath, baseK8s, version.Group, version.Version) - - if err := Generate(schemas, privateTypes, cattleOutputPackage, k8sOutputPackage); err != nil { - return err - } - - return nil -} - -func ControllersForForeignTypes(baseOutputPackage string, gv schema.GroupVersion, nsObjs []interface{}, objs []interface{}) error { - version := gv.Version - group := gv.Group - groupPath := group - - if groupPath == "" { - groupPath = "core" - } - - k8sOutputPackage := path.Join(baseOutputPackage, baseK8s, groupPath, version) - - return GenerateControllerForTypes(&types.APIVersion{ - Version: version, - Group: group, - Path: fmt.Sprintf("/k8s/%s-%s", groupPath, version), - }, k8sOutputPackage, nsObjs, objs) -} - -func getVersion(schemas *types.Schemas) *types.APIVersion { - var version types.APIVersion - for _, schema := range schemas.Schemas() { - if version.Group == "" { - version = schema.Version - continue - } - if version.Group != schema.Version.Group || - version.Version != schema.Version.Version { - panic("schema set contains two APIVersions") - } - } - - return &version -} diff --git a/generator/funcs.go b/generator/funcs.go index 4d191c76..14af9a0c 100644 --- a/generator/funcs.go +++ b/generator/funcs.go @@ -18,6 +18,7 @@ func funcs() template.FuncMap { "hasGet": hasGet, "hasPost": hasPost, "getCollectionOutput": getCollectionOutput, + "namespaced": namespaced, } } @@ -29,6 +30,10 @@ func hasGet(schema *types.Schema) bool { return contains(schema.CollectionMethods, http.MethodGet) } +func namespaced(schema *types.Schema) bool { + return schema.Scope == types.NamespaceScope +} + func hasPost(schema *types.Schema) bool { return contains(schema.CollectionMethods, http.MethodPost) } diff --git a/generator/generator.go b/generator/generator.go index d395c18c..2ca5dcb1 100644 --- a/generator/generator.go +++ b/generator/generator.go @@ -251,20 +251,21 @@ func generateScheme(external bool, outputDir string, version *types.APIVersion, for _, schema := range schemas { if !external { names = append(names, schema.CodeName) - } - if schema.CanList(nil) == nil { + names = append(names, schema.CodeName+"List") + } else if schema.CanList(nil) == nil { names = append(names, schema.CodeName+"List") } } return typeTemplate.Execute(output, map[string]interface{}{ - "version": version, - "schemas": schemas, - "names": names, + "external": external, + "version": version, + "schemas": schemas, + "names": names, }) } -func generateK8sClient(outputDir string, version *types.APIVersion, schemas []*types.Schema) error { +func generateK8sClient(external bool, outputDir string, version *types.APIVersion, schemas []*types.Schema) error { filePath := strings.ToLower("zz_generated_k8s_client.go") output, err := os.Create(path.Join(outputDir, filePath)) if err != nil { @@ -280,8 +281,9 @@ func generateK8sClient(outputDir string, version *types.APIVersion, schemas []*t } return typeTemplate.Execute(output, map[string]interface{}{ - "version": version, - "schemas": schemas, + "external": external, + "version": version, + "schemas": schemas, }) } @@ -350,11 +352,7 @@ func GenerateControllerForTypes(version *types.APIVersion, k8sOutputPackage stri } } - if err := deepCopyGen(baseDir, k8sOutputPackage); err != nil { - return err - } - - if err := generateK8sClient(k8sDir, version, controllers); err != nil { + if err := generateK8sClient(true, k8sDir, version, controllers); err != nil { return err } @@ -429,7 +427,7 @@ func Generate(schemas *types.Schemas, privateTypes map[string]bool, cattleOutput return err } - if err := generateK8sClient(k8sDir, &controllers[0].Version, controllers); err != nil { + if err := generateK8sClient(false, k8sDir, &controllers[0].Version, controllers); err != nil { return err } @@ -481,6 +479,7 @@ func prepareDirs(dirs ...string) error { func gofmt(workDir, pkg string) error { cmd := exec.Command("goimports", "-w", "-l", "./"+pkg) + fmt.Println(cmd.Args) cmd.Dir = workDir cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr diff --git a/generator/k8s_client_template.go b/generator/k8s_client_template.go index d7ae5eef..72723a58 100644 --- a/generator/k8s_client_template.go +++ b/generator/k8s_client_template.go @@ -6,69 +6,52 @@ import ( "sync" "context" + "github.com/rancher/lasso/pkg/client" + "github.com/rancher/lasso/pkg/controller" "github.com/rancher/norman/objectclient" - "github.com/rancher/norman/objectclient/dynamic" - "github.com/rancher/norman/controller" - "github.com/rancher/norman/restwatch" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" ) -type ( - contextKeyType struct{} - contextClientsKeyType struct{} -) - type Interface interface { - RESTClient() rest.Interface - controller.Starter {{range .schemas}} {{.CodeNamePlural}}Getter{{end}} } type Client struct { - sync.Mutex - restClient rest.Interface - starters []controller.Starter - {{range .schemas}} - {{.ID}}Controllers map[string]{{.CodeName}}Controller{{end}} + controllerFactory controller.SharedControllerFactory + clientFactory client.SharedClientFactory } -func NewForConfig(config rest.Config) (Interface, error) { - if config.NegotiatedSerializer == nil { - config.NegotiatedSerializer = dynamic.NegotiatedSerializer +{{ if not .external }} +func NewForConfig(cfg rest.Config) (Interface, error) { + scheme := runtime.NewScheme() + if err := AddToScheme(scheme); err != nil { + return nil, err } - - restClient, err := restwatch.UnversionedRESTClientFor(&config) + controllerFactory, err := controller.NewSharedControllerFactoryFromConfig(&cfg, scheme) if err != nil { return nil, err } + return NewFromControllerFactory(controllerFactory) +} +{{ end }} +func NewFromControllerFactory(factory controller.SharedControllerFactory) (Interface, error) { return &Client{ - restClient: restClient, - {{range .schemas}} - {{.ID}}Controllers: map[string]{{.CodeName}}Controller{},{{end}} + controllerFactory: factory, + clientFactory: factory.SharedCacheFactory().SharedClientFactory(), }, nil } -func (c *Client) RESTClient() rest.Interface { - return c.restClient -} - -func (c *Client) Sync(ctx context.Context) error { - return controller.Sync(ctx, c.starters...) -} - -func (c *Client) Start(ctx context.Context, threadiness int) error { - return controller.Start(ctx, threadiness, c.starters...) -} - {{range .schemas}} type {{.CodeNamePlural}}Getter interface { {{.CodeNamePlural}}(namespace string) {{.CodeName}}Interface } func (c *Client) {{.CodeNamePlural}}(namespace string) {{.CodeName}}Interface { - objectClient := objectclient.NewObjectClient(namespace, c.restClient, &{{.CodeName}}Resource, {{.CodeName}}GroupVersionKind, {{.ID}}Factory{}) + sharedClient := c.clientFactory.ForResourceKind({{.CodeName}}GroupVersionResource, {{.CodeName}}GroupVersionKind.Kind, {{ . | namespaced }}) + objectClient := objectclient.NewObjectClient(namespace, sharedClient, &{{.CodeName}}Resource, {{.CodeName}}GroupVersionKind, {{.ID}}Factory{}) return &{{.ID}}Client{ ns: namespace, client: c, diff --git a/generator/scheme_template.go b/generator/scheme_template.go index 031f55d6..bb45db17 100644 --- a/generator/scheme_template.go +++ b/generator/scheme_template.go @@ -26,6 +26,7 @@ func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } +{{- if not .external }} var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme @@ -41,4 +42,5 @@ func addKnownTypes(scheme *runtime.Scheme) error { metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil } +{{- end }} ` diff --git a/objectclient/object_client.go b/objectclient/object_client.go index fc251b1b..cb030db8 100644 --- a/objectclient/object_client.go +++ b/objectclient/object_client.go @@ -2,25 +2,16 @@ package objectclient import ( "context" - "encoding/json" - "net/http" - "strings" "github.com/pkg/errors" - "github.com/rancher/norman/restwatch" + "github.com/rancher/lasso/pkg/client" "github.com/sirupsen/logrus" - k8sError "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - json2 "k8s.io/apimachinery/pkg/runtime/serializer/json" - "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/rest" - restclientwatch "k8s.io/client-go/rest/watch" ) type ObjectFactory interface { @@ -46,6 +37,7 @@ type GenericClient interface { GetNamespaced(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) Get(name string, opts metav1.GetOptions) (runtime.Object, error) Update(name string, o runtime.Object) (runtime.Object, error) + UpdateStatus(name string, o runtime.Object) (runtime.Object, error) DeleteNamespaced(namespace, name string, opts *metav1.DeleteOptions) error Delete(name string, opts *metav1.DeleteOptions) error List(opts metav1.ListOptions) (runtime.Object, error) @@ -57,30 +49,33 @@ type GenericClient interface { } type ObjectClient struct { - restClient rest.Interface - resource *metav1.APIResource - gvk schema.GroupVersionKind - ns string - Factory ObjectFactory + ctx context.Context + client *client.Client + resource *metav1.APIResource + gvk schema.GroupVersionKind + ns string + Factory ObjectFactory } -func NewObjectClient(namespace string, restClient rest.Interface, apiResource *metav1.APIResource, gvk schema.GroupVersionKind, factory ObjectFactory) *ObjectClient { +func NewObjectClient(namespace string, client *client.Client, apiResource *metav1.APIResource, gvk schema.GroupVersionKind, factory ObjectFactory) *ObjectClient { return &ObjectClient{ - restClient: restClient, - resource: apiResource, - gvk: gvk, - ns: namespace, - Factory: factory, + ctx: context.TODO(), + client: client, + resource: apiResource, + gvk: gvk, + ns: namespace, + Factory: factory, } } func (p *ObjectClient) UnstructuredClient() GenericClient { return &ObjectClient{ - restClient: p.restClient, - resource: p.resource, - gvk: p.gvk, - ns: p.ns, - Factory: &UnstructuredObjectFactory{}, + ctx: p.ctx, + client: p.client, + resource: p.resource, + gvk: p.gvk, + ns: p.ns, + Factory: &UnstructuredObjectFactory{}, } } @@ -97,78 +92,24 @@ func (p *ObjectClient) getAPIPrefix() string { func (p *ObjectClient) Create(o runtime.Object) (runtime.Object, error) { ns := p.ns - obj, ok := o.(metav1.Object) - if ok && obj.GetNamespace() != "" { + if obj, ok := o.(metav1.Object); ok && obj.GetNamespace() != "" { ns = obj.GetNamespace() } - - if ok { - labels := obj.GetLabels() - if labels == nil { - labels = make(map[string]string) - } else { - ls := make(map[string]string) - for k, v := range labels { - ls[k] = v - } - labels = ls - - } - labels["cattle.io/creator"] = "norman" - obj.SetLabels(labels) - } - - if t, err := meta.TypeAccessor(o); err == nil { - if t.GetKind() == "" { - t.SetKind(p.gvk.Kind) - } - if t.GetAPIVersion() == "" { - apiVersion, _ := p.gvk.ToAPIVersionAndKind() - t.SetAPIVersion(apiVersion) - } - } - result := p.Factory.Object() logrus.Tracef("REST CREATE %s/%s/%s/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, ns, p.resource.Name) - err := p.restClient.Post(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - NamespaceIfScoped(ns, p.resource.Namespaced). - Resource(p.resource.Name). - Body(o). - Do(context.TODO()). - Into(result) - return result, err + result := p.ObjectFactory().Object() + return result, p.client.Create(p.ctx, ns, o, result, metav1.CreateOptions{}) } func (p *ObjectClient) GetNamespaced(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { - result := p.Factory.Object() - req := p.restClient.Get(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version) - if namespace != "" { - req = req.Namespace(namespace) - } - err := req. - Resource(p.resource.Name). - VersionedParams(&opts, metav1.ParameterCodec). - Name(name). - Do(context.TODO()). - Into(result) logrus.Tracef("REST GET %s/%s/%s/%s/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, namespace, p.resource.Name, name) - return result, err - + result := p.Factory.Object() + return result, p.client.Get(p.ctx, namespace, name, result, opts) } func (p *ObjectClient) Get(name string, opts metav1.GetOptions) (runtime.Object, error) { - result := p.Factory.Object() - err := p.restClient.Get(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - NamespaceIfScoped(p.ns, p.resource.Namespaced). - Resource(p.resource.Name). - VersionedParams(&opts, metav1.ParameterCodec). - Name(name). - Do(context.TODO()). - Into(result) logrus.Tracef("REST GET %s/%s/%s/%s/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, p.ns, p.resource.Name, name) - return result, err + result := p.Factory.Object() + return result, p.client.Get(p.ctx, p.ns, name, result, opts) } func (p *ObjectClient) Update(name string, o runtime.Object) (runtime.Object, error) { @@ -181,130 +122,59 @@ func (p *ObjectClient) Update(name string, o runtime.Object) (runtime.Object, er return result, errors.New("object missing name") } logrus.Tracef("REST UPDATE %s/%s/%s/%s/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, ns, p.resource.Name, name) - err := p.restClient.Put(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - NamespaceIfScoped(ns, p.resource.Namespaced). - Resource(p.resource.Name). - Name(name). - Body(o). - Do(context.TODO()). - Into(result) - return result, err + return result, p.client.Update(p.ctx, ns, o, result, metav1.UpdateOptions{}) +} + +func (p *ObjectClient) UpdateStatus(name string, o runtime.Object) (runtime.Object, error) { + ns := p.ns + if obj, ok := o.(metav1.Object); ok && obj.GetNamespace() != "" { + ns = obj.GetNamespace() + } + result := p.Factory.Object() + if len(name) == 0 { + return result, errors.New("object missing name") + } + logrus.Tracef("REST UPDATE %s/%s/%s/%s/status/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, ns, p.resource.Name, name) + return result, p.client.UpdateStatus(p.ctx, ns, o, result, metav1.UpdateOptions{}) } func (p *ObjectClient) DeleteNamespaced(namespace, name string, opts *metav1.DeleteOptions) error { - req := p.restClient.Delete(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version) - if namespace != "" { - req = req.Namespace(namespace) - } logrus.Tracef("REST DELETE %s/%s/%s/%s/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, namespace, p.resource.Name, name) - return req.Resource(p.resource.Name). - Name(name). - Body(opts). - Do(context.TODO()). - Error() + if opts == nil { + opts = &metav1.DeleteOptions{} + } + return p.client.Delete(p.ctx, namespace, name, *opts) } func (p *ObjectClient) Delete(name string, opts *metav1.DeleteOptions) error { logrus.Tracef("REST DELETE %s/%s/%s/%s/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, p.ns, p.resource.Name, name) - return p.restClient.Delete(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - NamespaceIfScoped(p.ns, p.resource.Namespaced). - Resource(p.resource.Name). - Name(name). - Body(opts). - Do(context.TODO()). - Error() + if opts == nil { + opts = &metav1.DeleteOptions{} + } + return p.client.Delete(p.ctx, p.ns, name, *opts) } func (p *ObjectClient) List(opts metav1.ListOptions) (runtime.Object, error) { result := p.Factory.List() logrus.Tracef("REST LIST %s/%s/%s/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, p.ns, p.resource.Name) - return result, p.restClient.Get(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - NamespaceIfScoped(p.ns, p.resource.Namespaced). - Resource(p.resource.Name). - VersionedParams(&opts, metav1.ParameterCodec). - Do(context.TODO()). - Into(result) + return result, p.client.List(p.ctx, p.ns, result, opts) } func (p *ObjectClient) ListNamespaced(namespace string, opts metav1.ListOptions) (runtime.Object, error) { result := p.Factory.List() logrus.Tracef("REST LIST %s/%s/%s/%s/%s", p.getAPIPrefix(), p.gvk.Group, p.gvk.Version, namespace, p.resource.Name) - return result, p.restClient.Get(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - NamespaceIfScoped(namespace, p.resource.Namespaced). - Resource(p.resource.Name). - VersionedParams(&opts, metav1.ParameterCodec). - Do(context.TODO()). - Into(result) + return result, p.client.List(p.ctx, namespace, result, opts) } func (p *ObjectClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { - restClient := p.restClient - if watchClient, ok := restClient.(restwatch.WatchClient); ok { - restClient = watchClient.WatchClient() - } - - r, err := restClient.Get(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - Prefix("watch"). - NamespaceIfScoped(p.ns, p.resource.Namespaced). - Resource(p.resource.Name). - VersionedParams(&opts, metav1.ParameterCodec). - Stream(context.TODO()) - if err != nil { - return nil, err - } - - embeddedDecoder := &structuredDecoder{ - factory: p.Factory, - } - streamDecoder := streaming.NewDecoder(json2.Framer.NewFrameReader(r), embeddedDecoder) - decoder := restclientwatch.NewDecoder(streamDecoder, embeddedDecoder) - return watch.NewStreamWatcher(decoder, k8sError.NewClientErrorReporter(http.StatusInternalServerError, "watch", "ClientWatchDecoding")), nil -} - -type structuredDecoder struct { - factory ObjectFactory -} - -func (d *structuredDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { - if into == nil { - into = d.factory.Object() - } - - err := json.Unmarshal(data, &into) - if err != nil { - status := &metav1.Status{} - if err := json.Unmarshal(data, status); err == nil && strings.ToLower(status.Kind) == "status" { - return status, defaults, nil - } - return nil, nil, err - } - - if _, ok := into.(*metav1.Status); !ok && strings.ToLower(into.GetObjectKind().GroupVersionKind().Kind) == "status" { - into = &metav1.Status{} - err := json.Unmarshal(data, into) - if err != nil { - return nil, nil, err - } - } - - return into, defaults, err + return p.client.Watch(p.ctx, p.ns, opts) } func (p *ObjectClient) DeleteCollection(deleteOptions *metav1.DeleteOptions, listOptions metav1.ListOptions) error { - return p.restClient.Delete(). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - NamespaceIfScoped(p.ns, p.resource.Namespaced). - Resource(p.resource.Name). - VersionedParams(&listOptions, metav1.ParameterCodec). - Body(deleteOptions). - Do(context.TODO()). - Error() + if deleteOptions == nil { + deleteOptions = &metav1.DeleteOptions{} + } + return p.client.DeleteCollection(p.ctx, p.ns, *deleteOptions, listOptions) } func (p *ObjectClient) Patch(name string, o runtime.Object, patchType types.PatchType, data []byte, subresources ...string) (runtime.Object, error) { @@ -316,16 +186,7 @@ func (p *ObjectClient) Patch(name string, o runtime.Object, patchType types.Patc if len(name) == 0 { return result, errors.New("object missing name") } - err := p.restClient.Patch(patchType). - Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). - NamespaceIfScoped(ns, p.resource.Namespaced). - Resource(p.resource.Name). - SubResource(subresources...). - Name(name). - Body(data). - Do(context.TODO()). - Into(result) - return result, err + return result, p.client.Patch(p.ctx, ns, name, patchType, data, result, metav1.PatchOptions{}, subresources...) } func (p *ObjectClient) ObjectFactory() ObjectFactory { diff --git a/store/crd/init.go b/store/crd/init.go index b9d1ccdb..af5f5d3f 100644 --- a/store/crd/init.go +++ b/store/crd/init.go @@ -8,7 +8,6 @@ import ( "github.com/rancher/norman/store/proxy" "github.com/rancher/norman/types" - "github.com/rancher/norman/types/convert" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" @@ -181,7 +180,7 @@ func (f *Factory) createCRD(ctx context.Context, apiClient clientset.Interface, Version: schema.Version.Version, Names: apiext.CustomResourceDefinitionNames{ Plural: plural, - Kind: convert.Capitalize(schema.ID), + Kind: schema.CodeName, }, }, }