From 07d95eaf3f6cf856b053bed1cd810d3a0bf78ce4 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Tue, 19 Jun 2018 11:26:12 -0700 Subject: [PATCH] Refactor how CRD stores are created --- store/crd/init.go | 75 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 60 insertions(+), 15 deletions(-) diff --git a/store/crd/init.go b/store/crd/init.go index 7800e875..6c40c0c7 100644 --- a/store/crd/init.go +++ b/store/crd/init.go @@ -2,10 +2,10 @@ package crd import ( "context" - "strings" - "time" - "fmt" + "strings" + "sync" + "time" "github.com/rancher/norman/store/proxy" "github.com/rancher/norman/types" @@ -16,14 +16,59 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" ) type Factory struct { + wg sync.WaitGroup ClientGetter proxy.ClientGetter } -func (c *Factory) AssignStores(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) error { - schemaStatus, err := c.CreateCRDs(ctx, storageContext, schemas...) +func NewFactoryFromClientGetter(clientGetter proxy.ClientGetter) *Factory { + return &Factory{ + ClientGetter: clientGetter, + } +} + +func NewFactoryFromClient(config rest.Config) (*Factory, error) { + getter, err := proxy.NewClientGetterFromConfig(config) + if err != nil { + return nil, err + } + + return &Factory{ + ClientGetter: getter, + }, nil +} + +func (f *Factory) BatchWait() { + f.wg.Wait() +} + +func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.StorageContext, schemas *types.Schemas, version *types.APIVersion, schemaIDs ...string) { + f.wg.Add(1) + go func() { + defer f.wg.Done() + + var schemasToCreate []*types.Schema + + for _, schemaID := range schemaIDs { + s := schemas.Schema(version, schemaID) + if s == nil { + panic("can not find schema " + schemaID) + } + schemasToCreate = append(schemasToCreate, s) + } + + err := f.AssignStores(ctx, storageContext, schemasToCreate...) + if err != nil { + panic("creating CRD store " + err.Error()) + } + }() +} + +func (f *Factory) AssignStores(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) error { + schemaStatus, err := f.CreateCRDs(ctx, storageContext, schemas...) if err != nil { return err } @@ -34,7 +79,7 @@ func (c *Factory) AssignStores(ctx context.Context, storageContext types.Storage return fmt.Errorf("failed to create create/find CRD for %s", schema.ID) } - schema.Store = proxy.NewProxyStore(ctx, c.ClientGetter, + schema.Store = proxy.NewProxyStore(ctx, f.ClientGetter, storageContext, []string{"apis"}, crd.Spec.Group, @@ -46,28 +91,28 @@ func (c *Factory) AssignStores(ctx context.Context, storageContext types.Storage return nil } -func (c *Factory) CreateCRDs(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) (map[*types.Schema]*apiext.CustomResourceDefinition, error) { +func (f *Factory) CreateCRDs(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) (map[*types.Schema]*apiext.CustomResourceDefinition, error) { schemaStatus := map[*types.Schema]*apiext.CustomResourceDefinition{} - apiClient, err := c.ClientGetter.APIExtClient(nil, storageContext) + apiClient, err := f.ClientGetter.APIExtClient(nil, storageContext) if err != nil { return nil, err } - ready, err := c.getReadyCRDs(apiClient) + ready, err := f.getReadyCRDs(apiClient) if err != nil { return nil, err } for _, schema := range schemas { - crd, err := c.createCRD(apiClient, schema, ready) + crd, err := f.createCRD(apiClient, schema, ready) if err != nil { return nil, err } schemaStatus[schema] = crd } - ready, err = c.getReadyCRDs(apiClient) + ready, err = f.getReadyCRDs(apiClient) if err != nil { return nil, err } @@ -76,7 +121,7 @@ func (c *Factory) CreateCRDs(ctx context.Context, storageContext types.StorageCo if readyCrd, ok := ready[crd.Name]; ok { schemaStatus[schema] = readyCrd } else { - if err := c.waitCRD(ctx, apiClient, crd.Name, schema, schemaStatus); err != nil { + if err := f.waitCRD(ctx, apiClient, crd.Name, schema, schemaStatus); err != nil { return nil, err } } @@ -85,7 +130,7 @@ func (c *Factory) CreateCRDs(ctx context.Context, storageContext types.StorageCo return schemaStatus, nil } -func (c *Factory) waitCRD(ctx context.Context, apiClient clientset.Interface, crdName string, schema *types.Schema, schemaStatus map[*types.Schema]*apiext.CustomResourceDefinition) error { +func (f *Factory) waitCRD(ctx context.Context, apiClient clientset.Interface, crdName string, schema *types.Schema, schemaStatus map[*types.Schema]*apiext.CustomResourceDefinition) error { logrus.Infof("Waiting for CRD %s to become available", crdName) defer logrus.Infof("Done waiting for CRD %s to become available", crdName) @@ -119,7 +164,7 @@ func (c *Factory) waitCRD(ctx context.Context, apiClient clientset.Interface, cr }) } -func (c *Factory) createCRD(apiClient clientset.Interface, schema *types.Schema, ready map[string]*apiext.CustomResourceDefinition) (*apiext.CustomResourceDefinition, error) { +func (f *Factory) createCRD(apiClient clientset.Interface, schema *types.Schema, ready map[string]*apiext.CustomResourceDefinition) (*apiext.CustomResourceDefinition, error) { plural := strings.ToLower(schema.PluralName) name := strings.ToLower(plural + "." + schema.Version.Group) @@ -156,7 +201,7 @@ func (c *Factory) createCRD(apiClient clientset.Interface, schema *types.Schema, return crd, err } -func (c *Factory) getReadyCRDs(apiClient clientset.Interface) (map[string]*apiext.CustomResourceDefinition, error) { +func (f *Factory) getReadyCRDs(apiClient clientset.Interface) (map[string]*apiext.CustomResourceDefinition, error) { list, err := apiClient.ApiextensionsV1beta1().CustomResourceDefinitions().List(metav1.ListOptions{}) if err != nil { return nil, err