1
0
mirror of https://github.com/rancher/norman.git synced 2025-09-18 08:14:56 +00:00

Refactor how CRD stores are created

This commit is contained in:
Darren Shepherd
2018-06-19 11:26:12 -07:00
parent 22fc71bcb6
commit 07d95eaf3f

View File

@@ -2,10 +2,10 @@ package crd
import (
"context"
"strings"
"time"
"fmt"
"strings"
"sync"
"time"
"github.com/rancher/norman/store/proxy"
"github.com/rancher/norman/types"
@@ -16,14 +16,59 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
)
type Factory struct {
wg sync.WaitGroup
ClientGetter proxy.ClientGetter
}
func (c *Factory) AssignStores(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) error {
schemaStatus, err := c.CreateCRDs(ctx, storageContext, schemas...)
func NewFactoryFromClientGetter(clientGetter proxy.ClientGetter) *Factory {
return &Factory{
ClientGetter: clientGetter,
}
}
func NewFactoryFromClient(config rest.Config) (*Factory, error) {
getter, err := proxy.NewClientGetterFromConfig(config)
if err != nil {
return nil, err
}
return &Factory{
ClientGetter: getter,
}, nil
}
func (f *Factory) BatchWait() {
f.wg.Wait()
}
func (f *Factory) BatchCreateCRDs(ctx context.Context, storageContext types.StorageContext, schemas *types.Schemas, version *types.APIVersion, schemaIDs ...string) {
f.wg.Add(1)
go func() {
defer f.wg.Done()
var schemasToCreate []*types.Schema
for _, schemaID := range schemaIDs {
s := schemas.Schema(version, schemaID)
if s == nil {
panic("can not find schema " + schemaID)
}
schemasToCreate = append(schemasToCreate, s)
}
err := f.AssignStores(ctx, storageContext, schemasToCreate...)
if err != nil {
panic("creating CRD store " + err.Error())
}
}()
}
func (f *Factory) AssignStores(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) error {
schemaStatus, err := f.CreateCRDs(ctx, storageContext, schemas...)
if err != nil {
return err
}
@@ -34,7 +79,7 @@ func (c *Factory) AssignStores(ctx context.Context, storageContext types.Storage
return fmt.Errorf("failed to create create/find CRD for %s", schema.ID)
}
schema.Store = proxy.NewProxyStore(ctx, c.ClientGetter,
schema.Store = proxy.NewProxyStore(ctx, f.ClientGetter,
storageContext,
[]string{"apis"},
crd.Spec.Group,
@@ -46,28 +91,28 @@ func (c *Factory) AssignStores(ctx context.Context, storageContext types.Storage
return nil
}
func (c *Factory) CreateCRDs(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) (map[*types.Schema]*apiext.CustomResourceDefinition, error) {
func (f *Factory) CreateCRDs(ctx context.Context, storageContext types.StorageContext, schemas ...*types.Schema) (map[*types.Schema]*apiext.CustomResourceDefinition, error) {
schemaStatus := map[*types.Schema]*apiext.CustomResourceDefinition{}
apiClient, err := c.ClientGetter.APIExtClient(nil, storageContext)
apiClient, err := f.ClientGetter.APIExtClient(nil, storageContext)
if err != nil {
return nil, err
}
ready, err := c.getReadyCRDs(apiClient)
ready, err := f.getReadyCRDs(apiClient)
if err != nil {
return nil, err
}
for _, schema := range schemas {
crd, err := c.createCRD(apiClient, schema, ready)
crd, err := f.createCRD(apiClient, schema, ready)
if err != nil {
return nil, err
}
schemaStatus[schema] = crd
}
ready, err = c.getReadyCRDs(apiClient)
ready, err = f.getReadyCRDs(apiClient)
if err != nil {
return nil, err
}
@@ -76,7 +121,7 @@ func (c *Factory) CreateCRDs(ctx context.Context, storageContext types.StorageCo
if readyCrd, ok := ready[crd.Name]; ok {
schemaStatus[schema] = readyCrd
} else {
if err := c.waitCRD(ctx, apiClient, crd.Name, schema, schemaStatus); err != nil {
if err := f.waitCRD(ctx, apiClient, crd.Name, schema, schemaStatus); err != nil {
return nil, err
}
}
@@ -85,7 +130,7 @@ func (c *Factory) CreateCRDs(ctx context.Context, storageContext types.StorageCo
return schemaStatus, nil
}
func (c *Factory) waitCRD(ctx context.Context, apiClient clientset.Interface, crdName string, schema *types.Schema, schemaStatus map[*types.Schema]*apiext.CustomResourceDefinition) error {
func (f *Factory) waitCRD(ctx context.Context, apiClient clientset.Interface, crdName string, schema *types.Schema, schemaStatus map[*types.Schema]*apiext.CustomResourceDefinition) error {
logrus.Infof("Waiting for CRD %s to become available", crdName)
defer logrus.Infof("Done waiting for CRD %s to become available", crdName)
@@ -119,7 +164,7 @@ func (c *Factory) waitCRD(ctx context.Context, apiClient clientset.Interface, cr
})
}
func (c *Factory) createCRD(apiClient clientset.Interface, schema *types.Schema, ready map[string]*apiext.CustomResourceDefinition) (*apiext.CustomResourceDefinition, error) {
func (f *Factory) createCRD(apiClient clientset.Interface, schema *types.Schema, ready map[string]*apiext.CustomResourceDefinition) (*apiext.CustomResourceDefinition, error) {
plural := strings.ToLower(schema.PluralName)
name := strings.ToLower(plural + "." + schema.Version.Group)
@@ -156,7 +201,7 @@ func (c *Factory) createCRD(apiClient clientset.Interface, schema *types.Schema,
return crd, err
}
func (c *Factory) getReadyCRDs(apiClient clientset.Interface) (map[string]*apiext.CustomResourceDefinition, error) {
func (f *Factory) getReadyCRDs(apiClient clientset.Interface) (map[string]*apiext.CustomResourceDefinition, error) {
list, err := apiClient.ApiextensionsV1beta1().CustomResourceDefinitions().List(metav1.ListOptions{})
if err != nil {
return nil, err