diff --git a/pkg/client/factory.go b/pkg/client/factory.go index d7c34c0..95c9aab 100644 --- a/pkg/client/factory.go +++ b/pkg/client/factory.go @@ -2,12 +2,14 @@ package client import ( "fmt" + "net/http" "time" "github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/schemaserver/types" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/dynamic" + "k8s.io/client-go/metadata" "k8s.io/client-go/rest" ) @@ -15,25 +17,49 @@ type Factory struct { impersonate bool clientCfg *rest.Config watchClientCfg *rest.Config - client dynamic.Interface + metadata metadata.Interface Config *rest.Config } +type addQuery struct { + values map[string]string + next http.RoundTripper +} + +func (a *addQuery) RoundTrip(req *http.Request) (*http.Response, error) { + q := req.URL.Query() + for k, v := range a.values { + q.Set(k, v) + } + req.Header.Set("Accept", "application/json;as=Table;v=v1;g=meta.k8s.io") + req.URL.RawQuery = q.Encode() + return a.next.RoundTrip(req) +} + func NewFactory(cfg *rest.Config, impersonate bool) (*Factory, error) { clientCfg := rest.CopyConfig(cfg) clientCfg.QPS = 10000 clientCfg.Burst = 100 + clientCfg.AcceptContentTypes = "application/json;as=Table;v=v1;g=meta.k8s.io" + clientCfg.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return &addQuery{ + values: map[string]string{ + "includeObject": "Object", + }, + next: rt, + } + }) watchClientCfg := rest.CopyConfig(cfg) watchClientCfg.Timeout = 30 * time.Minute - dc, err := dynamic.NewForConfig(watchClientCfg) + md, err := metadata.NewForConfig(cfg) if err != nil { return nil, err } return &Factory{ - client: dc, + metadata: md, impersonate: impersonate, clientCfg: clientCfg, watchClientCfg: watchClientCfg, @@ -41,8 +67,8 @@ func NewFactory(cfg *rest.Config, impersonate bool) (*Factory, error) { }, nil } -func (p *Factory) DynamicClient() dynamic.Interface { - return p.client +func (p *Factory) MetadataClient() metadata.Interface { + return p.metadata } func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { diff --git a/pkg/clustercache/controller.go b/pkg/clustercache/controller.go index 3f7d85a..f4fba9d 100644 --- a/pkg/clustercache/controller.go +++ b/pkg/clustercache/controller.go @@ -15,20 +15,15 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" schema2 "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/metadata" + "k8s.io/client-go/metadata/metadatainformer" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) -var ( - logOnce = sync.Once{} -) - type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error type ClusterCache interface { - AddController(gvk schema2.GroupVersionKind, informer cache.SharedIndexInformer) List(gvr schema2.GroupVersionResource) []interface{} OnAdd(ctx context.Context, handler Handler) OnRemove(ctx context.Context, handler Handler) @@ -56,7 +51,7 @@ type clusterCache struct { ctx context.Context typed map[schema2.GroupVersionKind]cache.SharedIndexInformer - informerFactory dynamicinformer.DynamicSharedInformerFactory + informerFactory metadatainformer.SharedInformerFactory controllerFactory generic.ControllerManager watchers map[schema2.GroupVersionResource]*watcher workqueue workqueue.DelayingInterface @@ -66,11 +61,11 @@ type clusterCache struct { changeHandlers cancelCollection } -func NewClusterCache(ctx context.Context, client dynamic.Interface) ClusterCache { +func NewClusterCache(ctx context.Context, client metadata.Interface) ClusterCache { c := &clusterCache{ ctx: ctx, typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{}, - informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, 2*time.Hour), + informerFactory: metadatainformer.NewSharedInformerFactory(client, 2*time.Hour), watchers: map[schema2.GroupVersionResource]*watcher{}, workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"), } @@ -78,10 +73,6 @@ func NewClusterCache(ctx context.Context, client dynamic.Interface) ClusterCache return c } -func (h *clusterCache) AddController(gvk schema2.GroupVersionKind, informer cache.SharedIndexInformer) { - h.typed[gvk] = informer -} - func validSchema(schema *types.APISchema) bool { canList := false canWatch := false @@ -170,7 +161,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { w.start = true } - logrus.Infof("Watching counts for %s", gvk.String()) + logrus.Infof("Watching metadata for %s", gvk.String()) h.addResourceEventHandler(gvr, w.informer) name := fmt.Sprintf("meta %s", gvk) h.controllerFactory.AddHandler(ctx, gvk, w.informer, name, func(key string, obj runtime.Object) (object runtime.Object, e error) { @@ -180,7 +171,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error { for gvr, w := range h.watchers { if !gvrs[gvr] { - logrus.Infof("Stopping count watch on %s", gvr) + logrus.Infof("Stopping metadata watch on %s", gvr) w.cancel() delete(h.watchers, gvr) } diff --git a/pkg/controllers/schema/schemas.go b/pkg/controllers/schema/schemas.go index 7c138f6..419c786 100644 --- a/pkg/controllers/schema/schemas.go +++ b/pkg/controllers/schema/schemas.go @@ -10,9 +10,12 @@ import ( schema2 "github.com/rancher/steve/pkg/schema" "github.com/rancher/steve/pkg/schema/converter" "github.com/rancher/steve/pkg/schemaserver/types" + "github.com/rancher/steve/pkg/server/resources/common" apiextcontrollerv1beta1 "github.com/rancher/wrangler-api/pkg/generated/controllers/apiextensions.k8s.io/v1beta1" v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/apiregistration.k8s.io/v1" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" authorizationv1 "k8s.io/api/authorization/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/client-go/discovery" @@ -20,6 +23,10 @@ import ( apiv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" ) +var ( + listPool = semaphore.NewWeighted(10) +) + type SchemasHandler interface { OnSchemas(schemas *schema2.Collection) error } @@ -31,14 +38,14 @@ type handler struct { toSync int32 schemas *schema2.Collection client discovery.DiscoveryInterface + cols *common.DynamicColumns crd apiextcontrollerv1beta1.CustomResourceDefinitionClient ssar authorizationv1client.SelfSubjectAccessReviewInterface handler SchemasHandler - - running map[string]func() } func Register(ctx context.Context, + cols *common.DynamicColumns, discovery discovery.DiscoveryInterface, crd apiextcontrollerv1beta1.CustomResourceDefinitionController, apiService v1.APIServiceController, @@ -48,12 +55,12 @@ func Register(ctx context.Context, h := &handler{ ctx: ctx, + cols: cols, client: discovery, schemas: schemas, handler: schemasHandler, crd: crd, ssar: ssar, - running: map[string]func(){}, } apiService.OnChange(ctx, "schema", h.OnChangeAPIService) @@ -105,6 +112,28 @@ func isListWatchable(schema *types.APISchema) bool { return canList && canWatch } +func (h *handler) getColumns(ctx context.Context, schemas map[string]*types.APISchema) error { + eg := errgroup.Group{} + + for _, schema := range schemas { + if !isListWatchable(schema) { + continue + } + + if err := listPool.Acquire(ctx, 1); err != nil { + return err + } + + s := schema + eg.Go(func() error { + defer listPool.Release(1) + return h.cols.SetColumns(s) + }) + } + + return eg.Wait() +} + func (h *handler) refreshAll() error { h.Lock() defer h.Unlock() @@ -131,7 +160,10 @@ func (h *handler) refreshAll() error { filteredSchemas[id] = schema } - h.startStopTemplate(filteredSchemas) + if err := h.getColumns(h.ctx, filteredSchemas); err != nil { + return err + } + h.schemas.Reset(filteredSchemas) if h.handler != nil { return h.handler.OnSchemas(h.schemas) @@ -140,32 +172,6 @@ func (h *handler) refreshAll() error { return nil } -func (h *handler) startStopTemplate(schemas map[string]*types.APISchema) { - for id := range schemas { - if _, ok := h.running[id]; ok { - continue - } - template := h.schemas.TemplateForSchemaID(id) - if template == nil || template.Start == nil { - continue - } - - subCtx, cancel := context.WithCancel(h.ctx) - if err := template.Start(subCtx); err != nil { - logrus.Errorf("failed to start schema template: %s", id) - continue - } - h.running[id] = cancel - } - - for id, cancel := range h.running { - if _, ok := schemas[id]; !ok { - cancel() - delete(h.running, id) - } - } -} - func (h *handler) allowed(schema *types.APISchema) (bool, error) { gvr := attributes.GVR(schema) ssar, err := h.ssar.Create(&authorizationv1.SelfSubjectAccessReview{ diff --git a/pkg/schema/defaultmapper.go b/pkg/schema/defaultmapper.go deleted file mode 100644 index 6765f16..0000000 --- a/pkg/schema/defaultmapper.go +++ /dev/null @@ -1,39 +0,0 @@ -package schema - -import ( - "fmt" - - "github.com/rancher/steve/pkg/schemaserver/types" - "github.com/rancher/wrangler/pkg/data" - "github.com/rancher/wrangler/pkg/schemas" - "github.com/rancher/wrangler/pkg/schemas/mappers" -) - -func newDefaultMapper() schemas.Mapper { - return &defaultMapper{} -} - -type defaultMapper struct { - mappers.EmptyMapper -} - -func (d *defaultMapper) FromInternal(data data.Object) { - if data["kind"] != "" && data["apiVersion"] != "" { - if t, ok := data["type"]; ok && data != nil { - data["_type"] = t - } - } - - if _, ok := data["id"]; ok || data == nil { - return - } - - name := types.Name(data) - namespace := types.Namespace(data) - - if namespace == "" { - data["id"] = name - } else { - data["id"] = fmt.Sprintf("%s/%s", namespace, name) - } -} diff --git a/pkg/schema/table/mapper.go b/pkg/schema/table/mapper.go index be98f82..b73f822 100644 --- a/pkg/schema/table/mapper.go +++ b/pkg/schema/table/mapper.go @@ -9,10 +9,12 @@ import ( ) type Column struct { - Name string `json:"name,omitempty"` - Field string `json:"field,omitempty"` - Type string `json:"type,omitempty"` - Format string `json:"format,omitempty"` + Name string `json:"name,omitempty"` + Field string `json:"field,omitempty"` + Type string `json:"type,omitempty"` + Format string `json:"format,omitempty"` + Description string `json:"description,omitempty"` + Priority int `json:"priority,omitempty"` } type Table struct { diff --git a/pkg/schemaserver/types/schemas.go b/pkg/schemaserver/types/schemas.go index 182c464..9f429e0 100644 --- a/pkg/schemaserver/types/schemas.go +++ b/pkg/schemaserver/types/schemas.go @@ -10,14 +10,14 @@ import ( type APISchemas struct { InternalSchemas *schemas.Schemas Schemas map[string]*APISchema - index map[string]*APISchema + index map[string]*APISchema } func EmptyAPISchemas() *APISchemas { return &APISchemas{ InternalSchemas: schemas.EmptySchemas(), Schemas: map[string]*APISchema{}, - index:map[string]*APISchema{}, + index: map[string]*APISchema{}, } } diff --git a/pkg/server/resources/common/defaultcolumns.go b/pkg/server/resources/common/defaultcolumns.go deleted file mode 100644 index f5ab021..0000000 --- a/pkg/server/resources/common/defaultcolumns.go +++ /dev/null @@ -1,42 +0,0 @@ -package common - -import ( - "github.com/rancher/steve/pkg/attributes" - "github.com/rancher/steve/pkg/schema/table" - "github.com/rancher/steve/pkg/schemaserver/types" - "github.com/rancher/wrangler/pkg/schemas" - "github.com/rancher/wrangler/pkg/schemas/mappers" -) - -var ( - NameColumn = table.Column{ - Name: "Name", - Field: "metadata.name", - Type: "string", - Format: "name", - } - CreatedColumn = table.Column{ - Name: "Created", - Field: "metadata.creationTimestamp", - Type: "string", - Format: "date", - } -) - -type DefaultColumns struct { - mappers.EmptyMapper -} - -func (d *DefaultColumns) ModifySchema(schema *schemas.Schema, schemas *schemas.Schemas) error { - as := &types.APISchema{ - Schema: schema, - } - if attributes.Columns(as) == nil { - attributes.SetColumns(as, []table.Column{ - NameColumn, - CreatedColumn, - }) - } - - return nil -} diff --git a/pkg/server/resources/common/dynamiccolumns.go b/pkg/server/resources/common/dynamiccolumns.go index 81a0926..15cadbf 100644 --- a/pkg/server/resources/common/dynamiccolumns.go +++ b/pkg/server/resources/common/dynamiccolumns.go @@ -1,11 +1,12 @@ package common import ( - "net/http" + "fmt" "github.com/rancher/steve/pkg/attributes" - "github.com/rancher/steve/pkg/schema/table" "github.com/rancher/steve/pkg/schemaserver/types" + "github.com/rancher/wrangler/pkg/ratelimit" + "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" "k8s.io/apimachinery/pkg/runtime" @@ -18,6 +19,11 @@ type DynamicColumns struct { client *rest.RESTClient } +type ColumnDefinition struct { + metav1.TableColumnDefinition `json:",inline"` + Field string `json:"field,omitempty"` +} + func NewDynamicColumns(config *rest.Config) (*DynamicColumns, error) { c, err := newClient(config) if err != nil { @@ -28,15 +34,6 @@ func NewDynamicColumns(config *rest.Config) (*DynamicColumns, error) { }, nil } -func hasGet(methods []string) bool { - for _, method := range methods { - if method == http.MethodGet { - return true - } - } - return false -} - func (d *DynamicColumns) SetColumns(schema *types.APISchema) error { if attributes.Columns(schema) != nil { return nil @@ -46,11 +43,6 @@ func (d *DynamicColumns) SetColumns(schema *types.APISchema) error { if gvr.Resource == "" { return nil } - nsed := attributes.Namespaced(schema) - - if !hasGet(schema.CollectionMethods) { - return nil - } r := d.client.Get() if gvr.Group == "" { @@ -59,33 +51,29 @@ func (d *DynamicColumns) SetColumns(schema *types.APISchema) error { r.Prefix("apis", gvr.Group) } r.Prefix(gvr.Version) - if nsed { - r.Prefix("namespaces", "default") - } r.Prefix(gvr.Resource) + r.VersionedParams(&metav1.ListOptions{ + Limit: 1, + }, metav1.ParameterCodec) obj, err := r.Do().Get() if err != nil { - return err + return nil } t, ok := obj.(*metav1.Table) if !ok { return nil } - var cols []table.Column - for _, cd := range t.ColumnDefinitions { - cols = append(cols, table.Column{ - Name: cd.Name, - Field: "metadata.computed.fields." + cd.Name, - Type: cd.Type, - Format: cd.Format, - }) - } - - if len(cols) > 0 { + if len(t.ColumnDefinitions) > 0 { + var cols []ColumnDefinition + for i, colDef := range t.ColumnDefinitions { + cols = append(cols, ColumnDefinition{ + TableColumnDefinition: colDef, + Field: fmt.Sprintf("$.metadata.fields[%d]", i), + }) + } attributes.SetColumns(schema, cols) - schema.Attributes["server-side-column"] = "true" } return nil @@ -93,6 +81,9 @@ func (d *DynamicColumns) SetColumns(schema *types.APISchema) error { func newClient(config *rest.Config) (*rest.RESTClient, error) { scheme := runtime.NewScheme() + if err := internalversion.AddToScheme(scheme); err != nil { + return nil, err + } if err := metav1.AddMetaToScheme(scheme); err != nil { return nil, err } @@ -101,9 +92,9 @@ func newClient(config *rest.Config) (*rest.RESTClient, error) { } config = rest.CopyConfig(config) + config.RateLimiter = ratelimit.None config.UserAgent = rest.DefaultKubernetesUserAgent() - config.AcceptContentTypes = "application/json;as=Table;v=v1beta1;g=meta.k8s.io" - config.ContentType = "application/json;as=Table;v=v1beta1;g=meta.k8s.io" + config.AcceptContentTypes = "application/json;as=Table;v=v1;g=meta.k8s.io" config.GroupVersion = &schema.GroupVersion{} config.NegotiatedSerializer = serializer.NewCodecFactory(scheme) config.APIPath = "/" diff --git a/pkg/server/resources/common/formatter.go b/pkg/server/resources/common/formatter.go index 4f7a40b..ebef5f7 100644 --- a/pkg/server/resources/common/formatter.go +++ b/pkg/server/resources/common/formatter.go @@ -11,7 +11,6 @@ func DefaultTemplate(clientGetter proxy.ClientGetter) schema.Template { return schema.Template{ Store: proxy.NewProxyStore(clientGetter), Formatter: Formatter, - Mapper: &DefaultColumns{}, } } diff --git a/pkg/server/server.go b/pkg/server/server.go index 4ba353f..64a47e0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -14,6 +14,7 @@ import ( "github.com/rancher/steve/pkg/schemaserver/types" "github.com/rancher/steve/pkg/server/handler" "github.com/rancher/steve/pkg/server/resources" + "github.com/rancher/steve/pkg/server/resources/common" ) var ErrConfigRequired = errors.New("rest config is required") @@ -52,18 +53,24 @@ func setup(ctx context.Context, server *Server) (http.Handler, *schema.Collectio return nil, nil, err } - ccache := clustercache.NewClusterCache(ctx, cf.DynamicClient()) + ccache := clustercache.NewClusterCache(ctx, cf.MetadataClient()) server.BaseSchemas = resources.DefaultSchemas(server.BaseSchemas, server.K8s.Discovery(), ccache) server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf)...) asl := server.AccessSetLookup if asl == nil { - asl = accesscontrol.NewAccessStore(server.RBAC) + asl = accesscontrol.NewAccessStore(ctx, true, server.RBAC) } - sf := schema.NewCollection(server.BaseSchemas, asl) + cols, err := common.NewDynamicColumns(server.RestConfig) + if err != nil { + return nil, nil, err + } + + sf := schema.NewCollection(ctx, server.BaseSchemas, asl) sync := schemacontroller.Register(ctx, + cols, server.K8s.Discovery(), server.CRD.CustomResourceDefinition(), server.API.APIService(), diff --git a/pkg/server/store/proxy/client.go b/pkg/server/store/proxy/client.go deleted file mode 100644 index d08f65f..0000000 --- a/pkg/server/store/proxy/client.go +++ /dev/null @@ -1,66 +0,0 @@ -package proxy - -import ( - "fmt" - - "github.com/rancher/steve/pkg/attributes" - "github.com/rancher/steve/pkg/schemaserver/httperror" - "github.com/rancher/steve/pkg/schemaserver/types" - "github.com/rancher/wrangler/pkg/schemas/validation" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apiserver/pkg/authentication/user" - "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/rest" -) - -type ClientFactory struct { - cfg rest.Config - client dynamic.Interface - impersonate bool - idToGVR map[string]schema.GroupVersionResource -} - -func NewClientFactory(cfg *rest.Config, impersonate bool) *ClientFactory { - return &ClientFactory{ - impersonate: impersonate, - cfg: *cfg, - idToGVR: map[string]schema.GroupVersionResource{}, - } -} - -func (p *ClientFactory) Client(ctx *types.APIRequest, schema *types.APISchema) (dynamic.ResourceInterface, error) { - gvr := attributes.GVR(schema) - if gvr.Resource == "" { - return nil, httperror.NewAPIError(validation.NotFound, "Failed to find gvr for "+schema.ID) - } - - user, ok := request.UserFrom(ctx.Request.Context()) - if !ok { - return nil, fmt.Errorf("failed to find user context for client") - } - - client, err := p.getClient(user) - if err != nil { - return nil, err - } - - return client.Resource(gvr), nil -} - -func (p *ClientFactory) getClient(user user.Info) (dynamic.Interface, error) { - if p.impersonate { - return p.client, nil - } - - if user.GetName() == "" { - return nil, fmt.Errorf("failed to determine current user") - } - - newCfg := p.cfg - newCfg.Impersonate.UserName = user.GetName() - newCfg.Impersonate.Groups = user.GetGroups() - newCfg.Impersonate.Extra = user.GetExtra() - - return dynamic.NewForConfig(&newCfg) -} diff --git a/pkg/server/store/proxy/proxy_store.go b/pkg/server/store/proxy/proxy_store.go index a4e83c4..e5c71a7 100644 --- a/pkg/server/store/proxy/proxy_store.go +++ b/pkg/server/store/proxy/proxy_store.go @@ -96,7 +96,9 @@ func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, id string return nil, err } - return k8sClient.Get(id, opts) + obj, err := k8sClient.Get(id, opts) + rowToObject(obj) + return obj, err } func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} { @@ -130,6 +132,51 @@ func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured return obj } +func rowToObject(obj *unstructured.Unstructured) { + if obj.Object["kind"] != "Table" || + obj.Object["apiVersion"] != "meta.k8s.io/v1" { + return + } + + items := tableToObjects(obj.Object) + if len(items) == 1 { + obj.Object = items[0].Object + } +} + +func tableToList(obj *unstructured.UnstructuredList) { + if obj.Object["kind"] != "Table" || + obj.Object["apiVersion"] != "meta.k8s.io/v1" { + return + } + + obj.Items = tableToObjects(obj.Object) +} + +func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured { + var result []unstructured.Unstructured + + rows, _ := obj["rows"].([]interface{}) + for _, row := range rows { + m, ok := row.(map[string]interface{}) + if !ok { + continue + } + cells := m["cells"] + object, ok := m["object"].(map[string]interface{}) + if !ok { + continue + } + + data.PutValue(object, cells, "metadata", "fields") + result = append(result, unstructured.Unstructured{ + Object: object, + }) + } + + return result +} + func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { k8sClient, err := s.clientGetter.Client(apiOp, schema, apiOp.Namespace) if err != nil { @@ -146,6 +193,8 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP return types.APIObjectList{}, err } + tableToList(resultList) + result := types.APIObjectList{ Revision: resultList.GetResourceVersion(), Continue: resultList.GetContinue(), @@ -230,6 +279,10 @@ func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et name = types.CreateAPIEvent } + if unstr, ok := obj.(*unstructured.Unstructured); ok { + rowToObject(unstr) + } + event := types.APIEvent{ Name: name, Object: toAPI(schema, obj),