diff --git a/api/builtin/schema.go b/api/builtin/schema.go index 10bc37fb..c49f9f29 100644 --- a/api/builtin/schema.go +++ b/api/builtin/schema.go @@ -49,7 +49,7 @@ var ( } Collection = types.Schema{ - ID: "error", + ID: "collection", Version: Version, ResourceMethods: []string{}, CollectionMethods: []string{}, diff --git a/api/handler/delete.go b/api/handler/delete.go index 348ea5e6..e64d5a3a 100644 --- a/api/handler/delete.go +++ b/api/handler/delete.go @@ -15,6 +15,6 @@ func DeleteHandler(request *types.APIContext) error { } } - request.WriteResponse(http.StatusOK, nil) + request.WriteResponse(http.StatusNoContent, nil) return nil } diff --git a/api/handler/validate.go b/api/handler/validate.go index c83b87b6..d295d0bd 100644 --- a/api/handler/validate.go +++ b/api/handler/validate.go @@ -15,9 +15,12 @@ func ParseAndValidateBody(apiContext *types.APIContext) (map[string]interface{}, b := builder.NewBuilder(apiContext) data, err = b.Construct(apiContext.Schema, data, builder.Create) - validator := apiContext.Schema.Validator - if validator != nil { - if err := validator(apiContext, data); err != nil { + if err != nil { + return nil, err + } + + if apiContext.Schema.Validator != nil { + if err := apiContext.Schema.Validator(apiContext, data); err != nil { return nil, err } } diff --git a/api/writer/html.go b/api/writer/html.go index f12a0aa3..cc202d15 100644 --- a/api/writer/html.go +++ b/api/writer/html.go @@ -40,7 +40,7 @@ func (h *HTMLResponseWriter) Write(apiContext *types.APIContext, code int, obj i headerString := strings.Replace(start, "%SCHEMAS%", apiContext.URLBuilder.Collection(schemaSchema, apiContext.Version), 1) apiContext.Response.Write([]byte(headerString)) } - h.Body(apiContext, code, obj) + h.Body(apiContext, apiContext.Response, obj) if schemaSchema != nil { apiContext.Response.Write(end) } diff --git a/api/writer/json.go b/api/writer/json.go index a5873638..509d1cd7 100644 --- a/api/writer/json.go +++ b/api/writer/json.go @@ -5,6 +5,8 @@ import ( "fmt" "net/http" + "io" + "github.com/rancher/norman/parse" "github.com/rancher/norman/parse/builder" "github.com/rancher/norman/types" @@ -21,13 +23,19 @@ func (j *JSONResponseWriter) start(apiContext *types.APIContext, code int, obj i func (j *JSONResponseWriter) Write(apiContext *types.APIContext, code int, obj interface{}) { j.start(apiContext, code, obj) - j.Body(apiContext, code, obj) + j.Body(apiContext, apiContext.Response, obj) } -func (j *JSONResponseWriter) Body(apiContext *types.APIContext, code int, obj interface{}) { +func (j *JSONResponseWriter) Body(apiContext *types.APIContext, writer io.Writer, obj interface{}) error { + return j.VersionBody(apiContext, apiContext.Version, writer, obj) + +} + +func (j *JSONResponseWriter) VersionBody(apiContext *types.APIContext, version *types.APIVersion, writer io.Writer, obj interface{}) error { var output interface{} builder := builder.NewBuilder(apiContext) + builder.Version = version switch v := obj.(type) { case []interface{}: @@ -41,8 +49,10 @@ func (j *JSONResponseWriter) Body(apiContext *types.APIContext, code int, obj in } if output != nil { - json.NewEncoder(apiContext.Response).Encode(output) + return json.NewEncoder(writer).Encode(output) } + + return nil } func (j *JSONResponseWriter) writeMapSlice(builder *builder.Builder, apiContext *types.APIContext, input []map[string]interface{}) *types.GenericCollection { collection := newCollection(apiContext) diff --git a/clientbase/object_client.go b/clientbase/object_client.go index 9b45700e..edef0dc1 100644 --- a/clientbase/object_client.go +++ b/clientbase/object_client.go @@ -117,7 +117,6 @@ func (p *ObjectClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { r, err := p.restClient.Get(). Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version). Prefix("watch"). - Namespace(p.ns). NamespaceIfScoped(p.ns, p.resource.Namespaced). Resource(p.resource.Name). VersionedParams(&opts, dynamic.VersionedParameterEncoderWithV1Fallback). diff --git a/controller/generic_controller.go b/controller/generic_controller.go index 6129f445..a1f4816f 100644 --- a/controller/generic_controller.go +++ b/controller/generic_controller.go @@ -24,7 +24,9 @@ type HandlerFunc func(key string) error type GenericController interface { Informer() cache.SharedIndexInformer AddHandler(handler HandlerFunc) + HandlerCount() int Enqueue(namespace, name string) + Sync(ctx context.Context) error Start(ctx context.Context, threadiness int) error } @@ -35,6 +37,7 @@ type genericController struct { queue workqueue.RateLimitingInterface name string running bool + synced bool } func NewGenericController(name string, objectClient *clientbase.ObjectClient) GenericController { @@ -53,6 +56,10 @@ func NewGenericController(name string, objectClient *clientbase.ObjectClient) Ge } } +func (g *genericController) HandlerCount() int { + return len(g.handlers) +} + func (g *genericController) Informer() cache.SharedIndexInformer { return g.informer } @@ -69,10 +76,50 @@ func (g *genericController) AddHandler(handler HandlerFunc) { g.handlers = append(g.handlers, handler) } +func (g *genericController) Sync(ctx context.Context) error { + g.Lock() + defer g.Unlock() + + return g.sync(ctx) +} + +func (g *genericController) sync(ctx context.Context) error { + if g.synced { + return nil + } + + defer utilruntime.HandleCrash() + + g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: g.queueObject, + UpdateFunc: func(_, obj interface{}) { + g.queueObject(obj) + }, + DeleteFunc: g.queueObject, + }) + + logrus.Infof("Starting %s Controller", g.name) + + go g.informer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) { + return fmt.Errorf("failed to sync controller %s", g.name) + } + + g.synced = true + return nil +} + func (g *genericController) Start(ctx context.Context, threadiness int) error { g.Lock() defer g.Unlock() + if !g.synced { + if err := g.sync(ctx); err != nil { + return err + } + } + if !g.running { go g.run(ctx, threadiness) } @@ -92,22 +139,6 @@ func (g *genericController) run(ctx context.Context, threadiness int) { defer utilruntime.HandleCrash() defer g.queue.ShutDown() - g.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: g.queueObject, - UpdateFunc: func(_, obj interface{}) { - g.queueObject(obj) - }, - DeleteFunc: g.queueObject, - }) - - logrus.Infof("Starting %s Controller", g.name) - - go g.informer.Run(ctx.Done()) - - if !cache.WaitForCacheSync(ctx.Done(), g.informer.HasSynced) { - return - } - for i := 0; i < threadiness; i++ { go wait.Until(g.runWorker, time.Second, ctx.Done()) } diff --git a/controller/starter.go b/controller/starter.go new file mode 100644 index 00000000..3a735de9 --- /dev/null +++ b/controller/starter.go @@ -0,0 +1,33 @@ +package controller + +import ( + "context" + + "golang.org/x/sync/errgroup" +) + +type Starter interface { + Sync(ctx context.Context) error + Start(ctx context.Context, threadiness int) error +} + +func Sync(ctx context.Context, starters ...Starter) error { + eg, ctx := errgroup.WithContext(ctx) + for _, starter := range starters { + func(starter Starter) { + eg.Go(func() error { + return starter.Sync(ctx) + }) + }(starter) + } + return eg.Wait() +} + +func Start(ctx context.Context, threadiness int, starters ...Starter) error { + for _, starter := range starters { + if err := starter.Start(ctx, threadiness); err != nil { + return err + } + } + return nil +} diff --git a/example/main.go b/example/main.go index 5b195d1c..3850e107 100644 --- a/example/main.go +++ b/example/main.go @@ -1,13 +1,14 @@ package main import ( - "context" "fmt" "net/http" "os" - "github.com/rancher/norman/server" + "github.com/rancher/norman/api" + "github.com/rancher/norman/store/crd" "github.com/rancher/norman/types" + "k8s.io/client-go/tools/clientcmd" ) type Foo struct { @@ -32,15 +33,25 @@ var ( ) func main() { - if _, err := Schemas.Import(&version, Foo{}); err != nil { + kubeConfig, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG")) + if err != nil { panic(err) } - server, err := server.NewAPIServer(context.Background(), os.Getenv("KUBECONFIG"), Schemas) + store, err := crd.NewCRDStoreFromConfig(*kubeConfig) if err != nil { panic(err) } + Schemas.MustImportAndCustomize(&version, Foo{}, func(schema *types.Schema) { + schema.Store = store + }) + + server := api.NewAPIServer() + if err := server.AddSchemas(Schemas); err != nil { + panic(err) + } + fmt.Println("Listening on 0.0.0.0:1234") http.ListenAndServe("0.0.0.0:1234", server) } diff --git a/generator/controller_template.go b/generator/controller_template.go index d370235f..111fa792 100644 --- a/generator/controller_template.go +++ b/generator/controller_template.go @@ -4,9 +4,9 @@ var controllerTemplate = `package {{.schema.Version.Version}} import ( "sync" - "context" + {{.importPackage}} "github.com/rancher/norman/clientbase" "github.com/rancher/norman/controller" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,7 +26,11 @@ var ( {{.schema.CodeName}}Resource = metav1.APIResource{ Name: "{{.schema.PluralName | toLower}}", SingularName: "{{.schema.ID | toLower}}", +{{- if eq .schema.Scope "namespace" }} + Namespaced: true, +{{ else }} Namespaced: false, +{{- end }} Kind: {{.schema.CodeName}}GroupVersionKind.Kind, } ) @@ -34,33 +38,70 @@ var ( type {{.schema.CodeName}}List struct { metav1.TypeMeta %BACK%json:",inline"%BACK% metav1.ListMeta %BACK%json:"metadata,omitempty"%BACK% - Items []{{.schema.CodeName}} + Items []{{.prefix}}{{.schema.CodeName}} } -type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.schema.CodeName}}) error +type {{.schema.CodeName}}HandlerFunc func(key string, obj *{{.prefix}}{{.schema.CodeName}}) error + +type {{.schema.CodeName}}Lister interface { + List(namespace string, selector labels.Selector) (ret []*{{.prefix}}{{.schema.CodeName}}, err error) + Get(namespace, name string) (*{{.prefix}}{{.schema.CodeName}}, error) +} type {{.schema.CodeName}}Controller interface { Informer() cache.SharedIndexInformer + Lister() {{.schema.CodeName}}Lister AddHandler(handler {{.schema.CodeName}}HandlerFunc) Enqueue(namespace, name string) + Sync(ctx context.Context) error Start(ctx context.Context, threadiness int) error } type {{.schema.CodeName}}Interface interface { - Create(*{{.schema.CodeName}}) (*{{.schema.CodeName}}, error) - Get(name string, opts metav1.GetOptions) (*{{.schema.CodeName}}, error) - Update(*{{.schema.CodeName}}) (*{{.schema.CodeName}}, error) + ObjectClient() *clientbase.ObjectClient + Create(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) + Get(name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) + Update(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) Delete(name string, options *metav1.DeleteOptions) error - List(opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) + List(opts metav1.ListOptions) (*{{.prefix}}{{.schema.CodeName}}List, error) Watch(opts metav1.ListOptions) (watch.Interface, error) DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error Controller() {{.schema.CodeName}}Controller } +type {{.schema.ID}}Lister struct { + controller *{{.schema.ID}}Controller +} + +func (l *{{.schema.ID}}Lister) List(namespace string, selector labels.Selector) (ret []*{{.prefix}}{{.schema.CodeName}}, err error) { + err = cache.ListAllByNamespace(l.controller.Informer().GetIndexer(), namespace, selector, func(obj interface{}) { + ret = append(ret, obj.(*{{.prefix}}{{.schema.CodeName}})) + }) + return +} + +func (l *{{.schema.ID}}Lister) Get(namespace, name string) (*{{.prefix}}{{.schema.CodeName}}, error) { + obj, exists, err := l.controller.Informer().GetIndexer().GetByKey(namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1.Resource("{{.schema.ID}}"), name) + } + return obj.(*{{.prefix}}{{.schema.CodeName}}), nil +} + type {{.schema.ID}}Controller struct { controller.GenericController } +func (c *{{.schema.ID}}Controller) Lister() {{.schema.CodeName}}Lister { + return &{{.schema.ID}}Lister{ + controller: c, + } +} + + func (c *{{.schema.ID}}Controller) AddHandler(handler {{.schema.CodeName}}HandlerFunc) { c.GenericController.AddHandler(func(key string) error { obj, exists, err := c.Informer().GetStore().GetByKey(key) @@ -70,7 +111,7 @@ func (c *{{.schema.ID}}Controller) AddHandler(handler {{.schema.CodeName}}Handle if !exists { return handler(key, nil) } - return handler(key, obj.(*{{.schema.CodeName}})) + return handler(key, obj.(*{{.prefix}}{{.schema.CodeName}})) }) } @@ -78,7 +119,7 @@ type {{.schema.ID}}Factory struct { } func (c {{.schema.ID}}Factory) Object() runtime.Object { - return &{{.schema.CodeName}}{} + return &{{.prefix}}{{.schema.CodeName}}{} } func (c {{.schema.ID}}Factory) List() runtime.Object { @@ -102,6 +143,7 @@ func (s *{{.schema.ID}}Client) Controller() {{.schema.CodeName}}Controller { } s.client.{{.schema.ID}}Controllers[s.ns] = c + s.client.starters = append(s.client.starters, c) return c } @@ -113,28 +155,32 @@ type {{.schema.ID}}Client struct { controller {{.schema.CodeName}}Controller } -func (s *{{.schema.ID}}Client) Create(o *{{.schema.CodeName}}) (*{{.schema.CodeName}}, error) { +func (s *{{.schema.ID}}Client) ObjectClient() *clientbase.ObjectClient { + return s.objectClient +} + +func (s *{{.schema.ID}}Client) Create(o *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) { obj, err := s.objectClient.Create(o) - return obj.(*{{.schema.CodeName}}), err + return obj.(*{{.prefix}}{{.schema.CodeName}}), err } -func (s *{{.schema.ID}}Client) Get(name string, opts metav1.GetOptions) (*{{.schema.CodeName}}, error) { +func (s *{{.schema.ID}}Client) Get(name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) { obj, err := s.objectClient.Get(name, opts) - return obj.(*{{.schema.CodeName}}), err + return obj.(*{{.prefix}}{{.schema.CodeName}}), err } -func (s *{{.schema.ID}}Client) Update(o *{{.schema.CodeName}}) (*{{.schema.CodeName}}, error) { +func (s *{{.schema.ID}}Client) Update(o *{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) { obj, err := s.objectClient.Update(o.Name, o) - return obj.(*{{.schema.CodeName}}), err + return obj.(*{{.prefix}}{{.schema.CodeName}}), err } func (s *{{.schema.ID}}Client) Delete(name string, options *metav1.DeleteOptions) error { return s.objectClient.Delete(name, options) } -func (s *{{.schema.ID}}Client) List(opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) { +func (s *{{.schema.ID}}Client) List(opts metav1.ListOptions) (*{{.prefix}}{{.schema.CodeName}}List, error) { obj, err := s.objectClient.List(opts) - return obj.(*{{.schema.CodeName}}List), err + return obj.(*{{.prefix}}{{.schema.CodeName}}List), err } func (s *{{.schema.ID}}Client) Watch(opts metav1.ListOptions) (watch.Interface, error) { diff --git a/generator/generator.go b/generator/generator.go index 69675425..58da7d4f 100644 --- a/generator/generator.go +++ b/generator/generator.go @@ -1,6 +1,8 @@ package generator import ( + "fmt" + "io" "io/ioutil" "net/http" "os" @@ -10,8 +12,6 @@ import ( "strings" "text/template" - "io" - "github.com/pkg/errors" "github.com/rancher/norman/types" "github.com/rancher/norman/types/convert" @@ -135,7 +135,7 @@ func generateType(outputDir string, schema *types.Schema, schemas *types.Schemas }) } -func generateController(outputDir string, schema *types.Schema, schemas *types.Schemas) error { +func generateController(external bool, outputDir string, schema *types.Schema, schemas *types.Schemas) error { filePath := strings.ToLower("zz_generated_" + addUnderscore(schema.ID) + "_controller.go") output, err := os.Create(path.Join(outputDir, filePath)) if err != nil { @@ -150,8 +150,18 @@ func generateController(outputDir string, schema *types.Schema, schemas *types.S return err } + importPackage := "" + prefix := "" + if external { + parts := strings.Split(schema.PkgName, "/vendor/") + importPackage = fmt.Sprintf("\"%s\"", parts[len(parts)-1]) + prefix = schema.Version.Version + "." + } + return typeTemplate.Execute(output, map[string]interface{}{ - "schema": schema, + "schema": schema, + "importPackage": importPackage, + "prefix": prefix, }) } @@ -195,6 +205,36 @@ func generateClient(outputDir string, schemas []*types.Schema) error { }) } +func GenerateControllerForTypes(version *types.APIVersion, k8sOutputPackage string, objs ...interface{}) error { + baseDir := args.DefaultSourceTree() + k8sDir := path.Join(baseDir, k8sOutputPackage) + + if err := prepareDirs(k8sDir); err != nil { + return err + } + + schemas := types.NewSchemas() + var controllers []*types.Schema + + for _, obj := range objs { + schema, err := schemas.Import(version, obj) + if err != nil { + return err + } + controllers = append(controllers, schema) + + if err := generateController(true, k8sDir, schema, schemas); err != nil { + return err + } + } + + if err := deepCopyGen(baseDir, k8sOutputPackage); err != nil { + return err + } + + return generateK8sClient(k8sDir, version, controllers) +} + func Generate(schemas *types.Schemas, cattleOutputPackage, k8sOutputPackage string) error { baseDir := args.DefaultSourceTree() cattleDir := path.Join(baseDir, cattleOutputPackage) @@ -220,7 +260,7 @@ func Generate(schemas *types.Schemas, cattleOutputPackage, k8sOutputPackage stri !strings.HasPrefix(schema.PkgName, "k8s.io") && !strings.Contains(schema.PkgName, "/vendor/") { controllers = append(controllers, schema) - if err := generateController(k8sDir, schema, schemas); err != nil { + if err := generateController(false, k8sDir, schema, schemas); err != nil { return err } } diff --git a/generator/k8s_client_template.go b/generator/k8s_client_template.go index 3dc1770d..e1ec2725 100644 --- a/generator/k8s_client_template.go +++ b/generator/k8s_client_template.go @@ -6,12 +6,14 @@ import ( "sync" "github.com/rancher/norman/clientbase" + "github.com/rancher/norman/controller" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" ) type Interface interface { RESTClient() rest.Interface + controller.Starter {{range .schemas}} {{.CodeNamePlural}}Getter{{end}} } @@ -19,6 +21,7 @@ type Interface interface { type Client struct { sync.Mutex restClient rest.Interface + starters []controller.Starter {{range .schemas}} {{.ID}}Controllers map[string]{{.CodeName}}Controller{{end}} } @@ -45,6 +48,14 @@ func (c *Client) RESTClient() rest.Interface { return c.restClient } +func (c *Client) Sync(ctx context.Context) error { + return controller.Sync(ctx, c.starters...) +} + +func (c *Client) Start(ctx context.Context, threadiness int) error { + return controller.Start(ctx, threadiness, c.starters...) +} + {{range .schemas}} type {{.CodeNamePlural}}Getter interface { {{.CodeNamePlural}}(namespace string) {{.CodeName}}Interface diff --git a/parse/parse.go b/parse/parse.go index 99e6df6d..0f9936b7 100644 --- a/parse/parse.go +++ b/parse/parse.go @@ -78,7 +78,7 @@ func parseSubContext(parts []string, apiRequest *types.APIContext) []string { apiRequest.SubContext = map[string]string{} apiRequest.Attributes = map[string]interface{}{} - for len(parts) > 3 && apiRequest.Version != nil { + for len(parts) > 3 && apiRequest.Version != nil && parts[3] != "" { resourceType := parts[1] resourceID := parts[2] diff --git a/server/server.go b/server/server.go deleted file mode 100644 index dab6ba5f..00000000 --- a/server/server.go +++ /dev/null @@ -1,68 +0,0 @@ -package server - -import ( - "context" - - "github.com/pkg/errors" - "github.com/rancher/norman/api" - "github.com/rancher/norman/store/crd" - "github.com/rancher/norman/types" - "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" -) - -func NewAPIServer(ctx context.Context, kubeConfig string, schemas *types.Schemas) (*api.Server, error) { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - return nil, errors.Wrap(err, "failed to build kubeConfig") - } - return NewAPIServerFromConfig(ctx, config, schemas) -} - -func NewClients(kubeConfig string) (rest.Interface, clientset.Interface, error) { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) - if err != nil { - return nil, nil, err - } - return NewClientsFromConfig(config) -} - -func NewClientsFromConfig(config *rest.Config) (rest.Interface, clientset.Interface, error) { - dynamicConfig := *config - if dynamicConfig.NegotiatedSerializer == nil { - configConfig := dynamic.ContentConfig() - dynamicConfig.NegotiatedSerializer = configConfig.NegotiatedSerializer - } - - k8sClient, err := rest.UnversionedRESTClientFor(&dynamicConfig) - if err != nil { - return nil, nil, err - } - - apiExtClient, err := clientset.NewForConfig(&dynamicConfig) - if err != nil { - return nil, nil, err - } - - return k8sClient, apiExtClient, nil -} - -func NewAPIServerFromConfig(ctx context.Context, config *rest.Config, schemas *types.Schemas) (*api.Server, error) { - k8sClient, apiExtClient, err := NewClientsFromConfig(config) - if err != nil { - return nil, err - } - return NewAPIServerFromClients(ctx, k8sClient, apiExtClient, schemas) -} - -func NewAPIServerFromClients(ctx context.Context, k8sClient rest.Interface, apiExtClient clientset.Interface, schemas *types.Schemas) (*api.Server, error) { - store := crd.NewCRDStore(apiExtClient, k8sClient) - if err := store.AddSchemas(ctx, schemas); err != nil { - return nil, err - } - - server := api.NewAPIServer() - return server, server.AddSchemas(schemas) -} diff --git a/store/crd/crd_store.go b/store/crd/crd_store.go index 0f3fc093..07c7c681 100644 --- a/store/crd/crd_store.go +++ b/store/crd/crd_store.go @@ -11,10 +11,12 @@ import ( "github.com/rancher/norman/types/convert" "github.com/sirupsen/logrus" apiext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" ) @@ -25,7 +27,27 @@ type Store struct { schemaStores map[*types.Schema]*proxy.Store } -func NewCRDStore(apiExtClientSet apiextclientset.Interface, k8sClient rest.Interface) *Store { +func NewCRDStoreFromConfig(config rest.Config) (*Store, error) { + dynamicConfig := config + if dynamicConfig.NegotiatedSerializer == nil { + configConfig := dynamic.ContentConfig() + dynamicConfig.NegotiatedSerializer = configConfig.NegotiatedSerializer + } + + k8sClient, err := rest.UnversionedRESTClientFor(&dynamicConfig) + if err != nil { + return nil, err + } + + apiExtClient, err := clientset.NewForConfig(&dynamicConfig) + if err != nil { + return nil, err + } + + return NewCRDStoreFromClients(apiExtClient, k8sClient), nil +} + +func NewCRDStoreFromClients(apiExtClientSet apiextclientset.Interface, k8sClient rest.Interface) *Store { return &Store{ apiExtClientSet: apiExtClientSet, k8sClient: k8sClient, @@ -57,6 +79,14 @@ func (c *Store) List(apiContext *types.APIContext, schema *types.Schema, opt typ return store.List(apiContext, schema, opt) } +func (c *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) (chan map[string]interface{}, error) { + store, ok := c.schemaStores[schema] + if !ok { + return nil, nil + } + return store.Watch(apiContext, schema, opt) +} + func (c *Store) Update(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}, id string) (map[string]interface{}, error) { store, ok := c.schemaStores[schema] if !ok { @@ -73,14 +103,10 @@ func (c *Store) Create(apiContext *types.APIContext, schema *types.Schema, data return store.Create(apiContext, schema, data) } -func (c *Store) AddSchemas(ctx context.Context, schemas *types.Schemas) error { - if schemas.Err() != nil { - return schemas.Err() - } - +func (c *Store) AddSchemas(ctx context.Context, schemas ...*types.Schema) error { schemaStatus := map[*types.Schema]*apiext.CustomResourceDefinition{} - for _, schema := range schemas.Schemas() { + for _, schema := range schemas { if schema.Store != nil || !contains(schema.CollectionMethods, http.MethodGet) { continue } @@ -108,7 +134,9 @@ func (c *Store) AddSchemas(ctx context.Context, schemas *types.Schemas) error { } for schema, crd := range schemaStatus { - if _, ok := ready[crd.Name]; !ok { + if crd, ok := ready[crd.Name]; ok { + schemaStatus[schema] = crd + } else { if err := c.waitCRD(ctx, crd.Name, schema, schemaStatus); err != nil { return err } @@ -171,24 +199,22 @@ func (c *Store) waitCRD(ctx context.Context, crdName string, schema *types.Schem }) } -func (c *Store) createCRD(schema *types.Schema, ready map[string]apiext.CustomResourceDefinition) (*apiext.CustomResourceDefinition, error) { +func (c *Store) createCRD(schema *types.Schema, ready map[string]*apiext.CustomResourceDefinition) (*apiext.CustomResourceDefinition, error) { plural := strings.ToLower(schema.PluralName) name := strings.ToLower(plural + "." + schema.Version.Group) crd, ok := ready[name] if ok { - return &crd, nil + return crd, nil } - crd = apiext.CustomResourceDefinition{ + crd = &apiext.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Spec: apiext.CustomResourceDefinitionSpec{ Group: schema.Version.Group, Version: schema.Version.Version, - //Scope: getScope(schema), - Scope: apiext.ClusterScoped, Names: apiext.CustomResourceDefinitionNames{ Plural: plural, Kind: convert.Capitalize(schema.ID), @@ -196,28 +222,34 @@ func (c *Store) createCRD(schema *types.Schema, ready map[string]apiext.CustomRe }, } - logrus.Infof("Creating CRD %s", name) - _, err := c.apiExtClientSet.ApiextensionsV1beta1().CustomResourceDefinitions().Create(&crd) - if errors.IsAlreadyExists(err) { - return &crd, nil + if schema.Scope == types.NamespaceScope { + crd.Spec.Scope = apiext.NamespaceScoped + } else { + crd.Spec.Scope = apiext.ClusterScoped } - return &crd, err + + logrus.Infof("Creating CRD %s", name) + crd, err := c.apiExtClientSet.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) + if errors.IsAlreadyExists(err) { + return crd, nil + } + return crd, err } -func (c *Store) getReadyCRDs() (map[string]apiext.CustomResourceDefinition, error) { +func (c *Store) getReadyCRDs() (map[string]*apiext.CustomResourceDefinition, error) { list, err := c.apiExtClientSet.ApiextensionsV1beta1().CustomResourceDefinitions().List(metav1.ListOptions{}) if err != nil { return nil, err } - result := map[string]apiext.CustomResourceDefinition{} + result := map[string]*apiext.CustomResourceDefinition{} - for _, crd := range list.Items { + for i, crd := range list.Items { for _, cond := range crd.Status.Conditions { switch cond.Type { case apiext.Established: if cond.Status == apiext.ConditionTrue { - result[crd.Name] = crd + result[crd.Name] = &list.Items[i] } } } diff --git a/store/empty/empty_store.go b/store/empty/empty_store.go index 4c458e02..12418534 100644 --- a/store/empty/empty_store.go +++ b/store/empty/empty_store.go @@ -1,6 +1,8 @@ package empty -import "github.com/rancher/norman/types" +import ( + "github.com/rancher/norman/types" +) type Store struct { } @@ -24,3 +26,7 @@ func (e *Store) Create(apiContext *types.APIContext, schema *types.Schema, data func (e *Store) Update(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}, id string) (map[string]interface{}, error) { return nil, nil } + +func (e *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) (chan map[string]interface{}, error) { + return nil, nil +} diff --git a/store/proxy/proxy_store.go b/store/proxy/proxy_store.go index d1d02922..2dfe4774 100644 --- a/store/proxy/proxy_store.go +++ b/store/proxy/proxy_store.go @@ -1,6 +1,7 @@ package proxy import ( + ejson "encoding/json" "strings" "github.com/rancher/norman/types" @@ -8,7 +9,14 @@ import ( "github.com/rancher/norman/types/mapper" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" + restclientwatch "k8s.io/client-go/rest/watch" ) type Store struct { @@ -61,13 +69,58 @@ func (p *Store) List(apiContext *types.APIContext, schema *types.Schema, opt typ return result, nil } +func (p *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) (chan map[string]interface{}, error) { + namespace := getNamespace(apiContext, opt) + + req := p.common(namespace, p.k8sClient.Get()) + req.VersionedParams(&metav1.ListOptions{ + Watch: true, + }, dynamic.VersionedParameterEncoderWithV1Fallback) + + body, err := req.Stream() + if err != nil { + return nil, err + } + + framer := json.Framer.NewFrameReader(body) + decoder := streaming.NewDecoder(framer, &unstructuredDecoder{}) + watcher := watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, &unstructuredDecoder{})) + + go func() { + <-apiContext.Request.Context().Done() + watcher.Stop() + }() + + result := make(chan map[string]interface{}) + go func() { + for event := range watcher.ResultChan() { + data := event.Object.(*unstructured.Unstructured) + p.fromInternal(schema, data.Object) + result <- data.Object + } + close(result) + }() + + return result, nil +} + +type unstructuredDecoder struct { +} + +func (d *unstructuredDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + if into == nil { + into = &unstructured.Unstructured{} + } + return into, defaults, ejson.Unmarshal(data, &into) +} + func getNamespace(apiContext *types.APIContext, opt types.QueryOptions) string { - if val, ok := apiContext.SubContext["namespace"]; ok { + if val, ok := apiContext.SubContext["namespaces"]; ok { return convert.ToString(val) } for _, condition := range opt.Conditions { - if condition.Field == "namespace" && condition.Value != "" { + if condition.Field == "namespaceId" && condition.Value != "" { return condition.Value } } @@ -76,14 +129,14 @@ func getNamespace(apiContext *types.APIContext, opt types.QueryOptions) string { } func (p *Store) Create(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}) (map[string]interface{}, error) { - namespace, _ := data["namespace"].(string) + namespace, _ := data["namespaceId"].(string) p.toInternal(schema.Mapper, data) name, _ := mapper.GetValueN(data, "metadata", "name").(string) if name == "" { generated, _ := mapper.GetValueN(data, "metadata", "generateName").(string) if generated == "" { - mapper.PutValue(data, schema.ID+"-", "metadata", "generateName") + mapper.PutValue(data, strings.ToLower(schema.ID+"-"), "metadata", "generateName") } } diff --git a/store/schema/schema_store.go b/store/schema/schema_store.go index a4d284ec..71f611a9 100644 --- a/store/schema/schema_store.go +++ b/store/schema/schema_store.go @@ -7,6 +7,7 @@ import ( "github.com/rancher/norman/store/empty" "github.com/rancher/norman/types" + "github.com/rancher/norman/types/definition" ) type Store struct { @@ -33,13 +34,25 @@ func (s *Store) ByID(apiContext *types.APIContext, schema *types.Schema, id stri return nil, nil } +func (s *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) (chan map[string]interface{}, error) { + return nil, nil +} + func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) ([]map[string]interface{}, error) { schemaMap := apiContext.Schemas.SchemasForVersion(*apiContext.Version) schemas := make([]*types.Schema, 0, len(schemaMap)) schemaData := make([]map[string]interface{}, 0, len(schemaMap)) + included := map[string]bool{} + for _, schema := range schemaMap { - schemas = append(schemas, schema) + if included[schema.ID] { + continue + } + + if schema.CanList() { + schemas = addSchema(schema, schemaMap, schemas, included) + } } data, err := json.Marshal(schemas) @@ -49,3 +62,43 @@ func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt typ return schemaData, json.Unmarshal(data, &schemaData) } + +func addSchema(schema *types.Schema, schemaMap map[string]*types.Schema, schemas []*types.Schema, included map[string]bool) []*types.Schema { + included[schema.ID] = true + schemas = traverseAndAdd(schema, schemaMap, schemas, included) + schemas = append(schemas, schema) + return schemas +} + +func traverseAndAdd(schema *types.Schema, schemaMap map[string]*types.Schema, schemas []*types.Schema, included map[string]bool) []*types.Schema { + for _, field := range schema.ResourceFields { + t := field.Type + if definition.HasReferenceType(t) { + for !definition.IsReferenceType(t) { + newT := definition.SubType(t) + if newT == t { + break + } + t = newT + } + } + + if refSchema, ok := schemaMap[t]; ok && !included[t] { + schemas = addSchema(refSchema, schemaMap, schemas, included) + } + } + + for _, action := range schema.ResourceActions { + for _, t := range []string{action.Output, action.Input} { + if t == "" { + continue + } + + if refSchema, ok := schemaMap[t]; ok && !included[t] { + schemas = addSchema(refSchema, schemaMap, schemas, included) + } + } + } + + return schemas +} diff --git a/store/tranform/transform.go b/store/tranform/transform.go index 007c04a0..76198bed 100644 --- a/store/tranform/transform.go +++ b/store/tranform/transform.go @@ -6,10 +6,13 @@ type TransformerFunc func(apiContext *types.APIContext, data map[string]interfac type ListTransformerFunc func(apiContext *types.APIContext, data []map[string]interface{}) ([]map[string]interface{}, error) +type StreamTransformerFunc func(apiContext *types.APIContext, data chan map[string]interface{}) (chan map[string]interface{}, error) + type TransformingStore struct { - Store types.Store - Transformer TransformerFunc - ListTransformer ListTransformerFunc + Store types.Store + Transformer TransformerFunc + ListTransformer ListTransformerFunc + StreamTransformer StreamTransformerFunc } func (t *TransformingStore) ByID(apiContext *types.APIContext, schema *types.Schema, id string) (map[string]interface{}, error) { @@ -23,6 +26,30 @@ func (t *TransformingStore) ByID(apiContext *types.APIContext, schema *types.Sch return t.Transformer(apiContext, data) } +func (t *TransformingStore) Watch(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) (chan map[string]interface{}, error) { + c, err := t.Store.Watch(apiContext, schema, opt) + if err != nil { + return nil, err + } + + if t.StreamTransformer != nil { + return t.StreamTransformer(apiContext, c) + } + + result := make(chan map[string]interface{}) + go func() { + for item := range c { + item, err := t.Transformer(apiContext, item) + if err == nil && item != nil { + result <- item + } + } + close(result) + }() + + return result, nil +} + func (t *TransformingStore) List(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) ([]map[string]interface{}, error) { data, err := t.Store.List(apiContext, schema, opt) if err != nil { @@ -43,7 +70,9 @@ func (t *TransformingStore) List(apiContext *types.APIContext, schema *types.Sch if err != nil { return nil, err } - result = append(result, item) + if item != nil { + result = append(result, item) + } } return result, nil diff --git a/store/wrapper/wrapper.go b/store/wrapper/wrapper.go index 9a809ba3..73b26623 100644 --- a/store/wrapper/wrapper.go +++ b/store/wrapper/wrapper.go @@ -36,6 +36,28 @@ func (s *StoreWrapper) List(apiContext *types.APIContext, schema *types.Schema, return apiContext.FilterList(opts, data), nil } +func (s *StoreWrapper) Watch(apiContext *types.APIContext, schema *types.Schema, opt types.QueryOptions) (chan map[string]interface{}, error) { + c, err := s.store.Watch(apiContext, schema, opt) + if err != nil { + return nil, err + } + + result := make(chan map[string]interface{}) + go func() { + for item := range c { + item = apiContext.FilterObject(types.QueryOptions{ + Conditions: apiContext.SubContextAttributeProvider.Query(apiContext, schema), + }, item) + if item != nil { + result <- item + } + } + close(result) + }() + + return result, nil +} + func (s *StoreWrapper) Create(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}) (map[string]interface{}, error) { for key, value := range apiContext.SubContextAttributeProvider.Create(apiContext, schema) { if data == nil { diff --git a/types/definition/definition.go b/types/definition/definition.go index 220563f2..ce07d0df 100644 --- a/types/definition/definition.go +++ b/types/definition/definition.go @@ -14,6 +14,10 @@ func IsReferenceType(fieldType string) bool { return strings.HasPrefix(fieldType, "reference[") && strings.HasSuffix(fieldType, "]") } +func HasReferenceType(fieldType string) bool { + return strings.Contains(fieldType, "reference[") +} + func SubType(fieldType string) string { i := strings.Index(fieldType, "[") if i <= 0 || i >= len(fieldType)-1 { diff --git a/types/mapper.go b/types/mapper.go index 3bd85878..b7ad2c65 100644 --- a/types/mapper.go +++ b/types/mapper.go @@ -1,6 +1,8 @@ package types import ( + "fmt" + "github.com/rancher/norman/types/definition" ) @@ -67,7 +69,7 @@ func (t *typeMapper) FromInternal(data map[string]interface{}) { data["type"] = t.typeName } name, _ := data["name"].(string) - namespace, _ := data["namespace"].(string) + namespace, _ := data["namespaceId"].(string) if _, ok := data["id"]; !ok { if name != "" { @@ -106,7 +108,7 @@ func (t *typeMapper) ToInternal(data map[string]interface{}) { func (t *typeMapper) ModifySchema(schema *Schema, schemas *Schemas) error { t.subSchemas = map[string]*Schema{} t.subArraySchemas = map[string]*Schema{} - t.typeName = schema.ID + t.typeName = fmt.Sprintf("%s/schemas/%s", schema.Version.Path, schema.ID) mapperSchema := schema if schema.InternalSchema != nil { diff --git a/types/mapper/move.go b/types/mapper/move.go index e39d361d..dbb9f290 100644 --- a/types/mapper/move.go +++ b/types/mapper/move.go @@ -7,6 +7,7 @@ import ( "github.com/rancher/norman/types" "github.com/rancher/norman/types/convert" + "github.com/rancher/norman/types/definition" ) type Move struct { @@ -64,7 +65,11 @@ func getField(schema *types.Schema, schemas *types.Schemas, target string) (*typ continue } - subSchema := schemas.Schema(&schema.Version, schema.ResourceFields[part].Type) + fieldType := schema.ResourceFields[part].Type + if definition.IsArrayType(fieldType) { + fieldType = definition.SubType(fieldType) + } + subSchema := schemas.Schema(&schema.Version, fieldType) if subSchema == nil { return nil, "", types.Field{}, false, fmt.Errorf("failed to find field or schema for %s on %s", part, schema.ID) } diff --git a/types/mapper/slice_to_map.go b/types/mapper/slice_to_map.go index b42d1e5c..64886563 100644 --- a/types/mapper/slice_to_map.go +++ b/types/mapper/slice_to_map.go @@ -52,6 +52,11 @@ func (s SliceToMap) ModifySchema(schema *types.Schema, schemas *types.Schemas) e return err } + subSchema, subFieldName, _, _, err := getField(schema, schemas, fmt.Sprintf("%s/%s", s.Field, s.Key)) + if err != nil { + return err + } + field := schema.ResourceFields[s.Field] if !definition.IsArrayType(field.Type) { return fmt.Errorf("field %s on %s is not an array", s.Field, schema.ID) @@ -60,5 +65,7 @@ func (s SliceToMap) ModifySchema(schema *types.Schema, schemas *types.Schemas) e field.Type = "map[" + definition.SubType(field.Type) + "]" schema.ResourceFields[s.Field] = field + delete(subSchema.ResourceFields, subFieldName) + return nil } diff --git a/types/reflection.go b/types/reflection.go index 2c7bd6ee..cbd09f96 100644 --- a/types/reflection.go +++ b/types/reflection.go @@ -174,7 +174,7 @@ func (s *Schemas) importType(version *APIVersion, t reflect.Type, overrides ...r schema.Mapper = mapper s.AddSchema(schema) - return schema, nil + return schema, s.Err() } func jsonName(f reflect.StructField) string { diff --git a/types/schemas.go b/types/schemas.go index 047021ee..64a748e5 100644 --- a/types/schemas.go +++ b/types/schemas.go @@ -63,7 +63,7 @@ func (s *Schemas) AddSchema(schema *Schema) *Schemas { s.errors = append(s.errors, fmt.Errorf("ID is not set on schema: %v", schema)) return s } - if schema.Version.Path == "" || schema.Version.Group == "" || schema.Version.Version == "" { + if schema.Version.Path == "" || schema.Version.Version == "" { s.errors = append(s.errors, fmt.Errorf("version is not set on schema: %s", schema.ID)) return s } diff --git a/types/server_types.go b/types/server_types.go index fd452f82..d00f8295 100644 --- a/types/server_types.go +++ b/types/server_types.go @@ -126,6 +126,7 @@ type QueryOptions struct { Sort Sort Pagination *Pagination Conditions []*QueryCondition + Options map[string]string } type ReferenceValidator interface { @@ -153,4 +154,5 @@ type Store interface { Create(apiContext *APIContext, schema *Schema, data map[string]interface{}) (map[string]interface{}, error) Update(apiContext *APIContext, schema *Schema, data map[string]interface{}, id string) (map[string]interface{}, error) Delete(apiContext *APIContext, schema *Schema, id string) error + Watch(apiContext *APIContext, schema *Schema, opt QueryOptions) (chan map[string]interface{}, error) } diff --git a/types/types.go b/types/types.go index b465f86a..fadfcfb3 100644 --- a/types/types.go +++ b/types/types.go @@ -89,7 +89,7 @@ type Schema struct { Version APIVersion `json:"version"` PluralName string `json:"pluralName,omitempty"` ResourceMethods []string `json:"resourceMethods,omitempty"` - ResourceFields map[string]Field `json:"resourceFields,omitempty"` + ResourceFields map[string]Field `json:"resourceFields"` ResourceActions map[string]Action `json:"resourceActions,omitempty"` CollectionMethods []string `json:"collectionMethods,omitempty"` CollectionFields map[string]Field `json:"collectionFields,omitempty"`