Show server side columns

This commit is contained in:
Darren Shepherd 2020-02-08 13:04:20 -07:00
parent 9f771dcf65
commit 600f875fce
12 changed files with 171 additions and 243 deletions

View File

@ -2,12 +2,14 @@ package client
import ( import (
"fmt" "fmt"
"net/http"
"time" "time"
"github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/schemaserver/types" "github.com/rancher/steve/pkg/schemaserver/types"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
) )
@ -15,25 +17,49 @@ type Factory struct {
impersonate bool impersonate bool
clientCfg *rest.Config clientCfg *rest.Config
watchClientCfg *rest.Config watchClientCfg *rest.Config
client dynamic.Interface metadata metadata.Interface
Config *rest.Config Config *rest.Config
} }
type addQuery struct {
values map[string]string
next http.RoundTripper
}
func (a *addQuery) RoundTrip(req *http.Request) (*http.Response, error) {
q := req.URL.Query()
for k, v := range a.values {
q.Set(k, v)
}
req.Header.Set("Accept", "application/json;as=Table;v=v1;g=meta.k8s.io")
req.URL.RawQuery = q.Encode()
return a.next.RoundTrip(req)
}
func NewFactory(cfg *rest.Config, impersonate bool) (*Factory, error) { func NewFactory(cfg *rest.Config, impersonate bool) (*Factory, error) {
clientCfg := rest.CopyConfig(cfg) clientCfg := rest.CopyConfig(cfg)
clientCfg.QPS = 10000 clientCfg.QPS = 10000
clientCfg.Burst = 100 clientCfg.Burst = 100
clientCfg.AcceptContentTypes = "application/json;as=Table;v=v1;g=meta.k8s.io"
clientCfg.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &addQuery{
values: map[string]string{
"includeObject": "Object",
},
next: rt,
}
})
watchClientCfg := rest.CopyConfig(cfg) watchClientCfg := rest.CopyConfig(cfg)
watchClientCfg.Timeout = 30 * time.Minute watchClientCfg.Timeout = 30 * time.Minute
dc, err := dynamic.NewForConfig(watchClientCfg) md, err := metadata.NewForConfig(cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Factory{ return &Factory{
client: dc, metadata: md,
impersonate: impersonate, impersonate: impersonate,
clientCfg: clientCfg, clientCfg: clientCfg,
watchClientCfg: watchClientCfg, watchClientCfg: watchClientCfg,
@ -41,8 +67,8 @@ func NewFactory(cfg *rest.Config, impersonate bool) (*Factory, error) {
}, nil }, nil
} }
func (p *Factory) DynamicClient() dynamic.Interface { func (p *Factory) MetadataClient() metadata.Interface {
return p.client return p.metadata
} }
func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) {

View File

@ -15,20 +15,15 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
schema2 "k8s.io/apimachinery/pkg/runtime/schema" schema2 "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic" "k8s.io/client-go/metadata"
"k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
) )
var (
logOnce = sync.Once{}
)
type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error type Handler func(gvr schema2.GroupVersionResource, key string, obj runtime.Object) error
type ClusterCache interface { type ClusterCache interface {
AddController(gvk schema2.GroupVersionKind, informer cache.SharedIndexInformer)
List(gvr schema2.GroupVersionResource) []interface{} List(gvr schema2.GroupVersionResource) []interface{}
OnAdd(ctx context.Context, handler Handler) OnAdd(ctx context.Context, handler Handler)
OnRemove(ctx context.Context, handler Handler) OnRemove(ctx context.Context, handler Handler)
@ -56,7 +51,7 @@ type clusterCache struct {
ctx context.Context ctx context.Context
typed map[schema2.GroupVersionKind]cache.SharedIndexInformer typed map[schema2.GroupVersionKind]cache.SharedIndexInformer
informerFactory dynamicinformer.DynamicSharedInformerFactory informerFactory metadatainformer.SharedInformerFactory
controllerFactory generic.ControllerManager controllerFactory generic.ControllerManager
watchers map[schema2.GroupVersionResource]*watcher watchers map[schema2.GroupVersionResource]*watcher
workqueue workqueue.DelayingInterface workqueue workqueue.DelayingInterface
@ -66,11 +61,11 @@ type clusterCache struct {
changeHandlers cancelCollection changeHandlers cancelCollection
} }
func NewClusterCache(ctx context.Context, client dynamic.Interface) ClusterCache { func NewClusterCache(ctx context.Context, client metadata.Interface) ClusterCache {
c := &clusterCache{ c := &clusterCache{
ctx: ctx, ctx: ctx,
typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{}, typed: map[schema2.GroupVersionKind]cache.SharedIndexInformer{},
informerFactory: dynamicinformer.NewDynamicSharedInformerFactory(client, 2*time.Hour), informerFactory: metadatainformer.NewSharedInformerFactory(client, 2*time.Hour),
watchers: map[schema2.GroupVersionResource]*watcher{}, watchers: map[schema2.GroupVersionResource]*watcher{},
workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"), workqueue: workqueue.NewNamedDelayingQueue("cluster-cache"),
} }
@ -78,10 +73,6 @@ func NewClusterCache(ctx context.Context, client dynamic.Interface) ClusterCache
return c return c
} }
func (h *clusterCache) AddController(gvk schema2.GroupVersionKind, informer cache.SharedIndexInformer) {
h.typed[gvk] = informer
}
func validSchema(schema *types.APISchema) bool { func validSchema(schema *types.APISchema) bool {
canList := false canList := false
canWatch := false canWatch := false
@ -170,7 +161,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
w.start = true w.start = true
} }
logrus.Infof("Watching counts for %s", gvk.String()) logrus.Infof("Watching metadata for %s", gvk.String())
h.addResourceEventHandler(gvr, w.informer) h.addResourceEventHandler(gvr, w.informer)
name := fmt.Sprintf("meta %s", gvk) name := fmt.Sprintf("meta %s", gvk)
h.controllerFactory.AddHandler(ctx, gvk, w.informer, name, func(key string, obj runtime.Object) (object runtime.Object, e error) { h.controllerFactory.AddHandler(ctx, gvk, w.informer, name, func(key string, obj runtime.Object) (object runtime.Object, e error) {
@ -180,7 +171,7 @@ func (h *clusterCache) OnSchemas(schemas *schema.Collection) error {
for gvr, w := range h.watchers { for gvr, w := range h.watchers {
if !gvrs[gvr] { if !gvrs[gvr] {
logrus.Infof("Stopping count watch on %s", gvr) logrus.Infof("Stopping metadata watch on %s", gvr)
w.cancel() w.cancel()
delete(h.watchers, gvr) delete(h.watchers, gvr)
} }

View File

@ -10,9 +10,12 @@ import (
schema2 "github.com/rancher/steve/pkg/schema" schema2 "github.com/rancher/steve/pkg/schema"
"github.com/rancher/steve/pkg/schema/converter" "github.com/rancher/steve/pkg/schema/converter"
"github.com/rancher/steve/pkg/schemaserver/types" "github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/steve/pkg/server/resources/common"
apiextcontrollerv1beta1 "github.com/rancher/wrangler-api/pkg/generated/controllers/apiextensions.k8s.io/v1beta1" apiextcontrollerv1beta1 "github.com/rancher/wrangler-api/pkg/generated/controllers/apiextensions.k8s.io/v1beta1"
v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/apiregistration.k8s.io/v1" v1 "github.com/rancher/wrangler-api/pkg/generated/controllers/apiregistration.k8s.io/v1"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
authorizationv1 "k8s.io/api/authorization/v1" authorizationv1 "k8s.io/api/authorization/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
@ -20,6 +23,10 @@ import (
apiv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
) )
var (
listPool = semaphore.NewWeighted(10)
)
type SchemasHandler interface { type SchemasHandler interface {
OnSchemas(schemas *schema2.Collection) error OnSchemas(schemas *schema2.Collection) error
} }
@ -31,14 +38,14 @@ type handler struct {
toSync int32 toSync int32
schemas *schema2.Collection schemas *schema2.Collection
client discovery.DiscoveryInterface client discovery.DiscoveryInterface
cols *common.DynamicColumns
crd apiextcontrollerv1beta1.CustomResourceDefinitionClient crd apiextcontrollerv1beta1.CustomResourceDefinitionClient
ssar authorizationv1client.SelfSubjectAccessReviewInterface ssar authorizationv1client.SelfSubjectAccessReviewInterface
handler SchemasHandler handler SchemasHandler
running map[string]func()
} }
func Register(ctx context.Context, func Register(ctx context.Context,
cols *common.DynamicColumns,
discovery discovery.DiscoveryInterface, discovery discovery.DiscoveryInterface,
crd apiextcontrollerv1beta1.CustomResourceDefinitionController, crd apiextcontrollerv1beta1.CustomResourceDefinitionController,
apiService v1.APIServiceController, apiService v1.APIServiceController,
@ -48,12 +55,12 @@ func Register(ctx context.Context,
h := &handler{ h := &handler{
ctx: ctx, ctx: ctx,
cols: cols,
client: discovery, client: discovery,
schemas: schemas, schemas: schemas,
handler: schemasHandler, handler: schemasHandler,
crd: crd, crd: crd,
ssar: ssar, ssar: ssar,
running: map[string]func(){},
} }
apiService.OnChange(ctx, "schema", h.OnChangeAPIService) apiService.OnChange(ctx, "schema", h.OnChangeAPIService)
@ -105,6 +112,28 @@ func isListWatchable(schema *types.APISchema) bool {
return canList && canWatch return canList && canWatch
} }
func (h *handler) getColumns(ctx context.Context, schemas map[string]*types.APISchema) error {
eg := errgroup.Group{}
for _, schema := range schemas {
if !isListWatchable(schema) {
continue
}
if err := listPool.Acquire(ctx, 1); err != nil {
return err
}
s := schema
eg.Go(func() error {
defer listPool.Release(1)
return h.cols.SetColumns(s)
})
}
return eg.Wait()
}
func (h *handler) refreshAll() error { func (h *handler) refreshAll() error {
h.Lock() h.Lock()
defer h.Unlock() defer h.Unlock()
@ -131,7 +160,10 @@ func (h *handler) refreshAll() error {
filteredSchemas[id] = schema filteredSchemas[id] = schema
} }
h.startStopTemplate(filteredSchemas) if err := h.getColumns(h.ctx, filteredSchemas); err != nil {
return err
}
h.schemas.Reset(filteredSchemas) h.schemas.Reset(filteredSchemas)
if h.handler != nil { if h.handler != nil {
return h.handler.OnSchemas(h.schemas) return h.handler.OnSchemas(h.schemas)
@ -140,32 +172,6 @@ func (h *handler) refreshAll() error {
return nil return nil
} }
func (h *handler) startStopTemplate(schemas map[string]*types.APISchema) {
for id := range schemas {
if _, ok := h.running[id]; ok {
continue
}
template := h.schemas.TemplateForSchemaID(id)
if template == nil || template.Start == nil {
continue
}
subCtx, cancel := context.WithCancel(h.ctx)
if err := template.Start(subCtx); err != nil {
logrus.Errorf("failed to start schema template: %s", id)
continue
}
h.running[id] = cancel
}
for id, cancel := range h.running {
if _, ok := schemas[id]; !ok {
cancel()
delete(h.running, id)
}
}
}
func (h *handler) allowed(schema *types.APISchema) (bool, error) { func (h *handler) allowed(schema *types.APISchema) (bool, error) {
gvr := attributes.GVR(schema) gvr := attributes.GVR(schema)
ssar, err := h.ssar.Create(&authorizationv1.SelfSubjectAccessReview{ ssar, err := h.ssar.Create(&authorizationv1.SelfSubjectAccessReview{

View File

@ -1,39 +0,0 @@
package schema
import (
"fmt"
"github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/wrangler/pkg/data"
"github.com/rancher/wrangler/pkg/schemas"
"github.com/rancher/wrangler/pkg/schemas/mappers"
)
func newDefaultMapper() schemas.Mapper {
return &defaultMapper{}
}
type defaultMapper struct {
mappers.EmptyMapper
}
func (d *defaultMapper) FromInternal(data data.Object) {
if data["kind"] != "" && data["apiVersion"] != "" {
if t, ok := data["type"]; ok && data != nil {
data["_type"] = t
}
}
if _, ok := data["id"]; ok || data == nil {
return
}
name := types.Name(data)
namespace := types.Namespace(data)
if namespace == "" {
data["id"] = name
} else {
data["id"] = fmt.Sprintf("%s/%s", namespace, name)
}
}

View File

@ -9,10 +9,12 @@ import (
) )
type Column struct { type Column struct {
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
Field string `json:"field,omitempty"` Field string `json:"field,omitempty"`
Type string `json:"type,omitempty"` Type string `json:"type,omitempty"`
Format string `json:"format,omitempty"` Format string `json:"format,omitempty"`
Description string `json:"description,omitempty"`
Priority int `json:"priority,omitempty"`
} }
type Table struct { type Table struct {

View File

@ -10,14 +10,14 @@ import (
type APISchemas struct { type APISchemas struct {
InternalSchemas *schemas.Schemas InternalSchemas *schemas.Schemas
Schemas map[string]*APISchema Schemas map[string]*APISchema
index map[string]*APISchema index map[string]*APISchema
} }
func EmptyAPISchemas() *APISchemas { func EmptyAPISchemas() *APISchemas {
return &APISchemas{ return &APISchemas{
InternalSchemas: schemas.EmptySchemas(), InternalSchemas: schemas.EmptySchemas(),
Schemas: map[string]*APISchema{}, Schemas: map[string]*APISchema{},
index:map[string]*APISchema{}, index: map[string]*APISchema{},
} }
} }

View File

@ -1,42 +0,0 @@
package common
import (
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/schema/table"
"github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/wrangler/pkg/schemas"
"github.com/rancher/wrangler/pkg/schemas/mappers"
)
var (
NameColumn = table.Column{
Name: "Name",
Field: "metadata.name",
Type: "string",
Format: "name",
}
CreatedColumn = table.Column{
Name: "Created",
Field: "metadata.creationTimestamp",
Type: "string",
Format: "date",
}
)
type DefaultColumns struct {
mappers.EmptyMapper
}
func (d *DefaultColumns) ModifySchema(schema *schemas.Schema, schemas *schemas.Schemas) error {
as := &types.APISchema{
Schema: schema,
}
if attributes.Columns(as) == nil {
attributes.SetColumns(as, []table.Column{
NameColumn,
CreatedColumn,
})
}
return nil
}

View File

@ -1,11 +1,12 @@
package common package common
import ( import (
"net/http" "fmt"
"github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/schema/table"
"github.com/rancher/steve/pkg/schemaserver/types" "github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/wrangler/pkg/ratelimit"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -18,6 +19,11 @@ type DynamicColumns struct {
client *rest.RESTClient client *rest.RESTClient
} }
type ColumnDefinition struct {
metav1.TableColumnDefinition `json:",inline"`
Field string `json:"field,omitempty"`
}
func NewDynamicColumns(config *rest.Config) (*DynamicColumns, error) { func NewDynamicColumns(config *rest.Config) (*DynamicColumns, error) {
c, err := newClient(config) c, err := newClient(config)
if err != nil { if err != nil {
@ -28,15 +34,6 @@ func NewDynamicColumns(config *rest.Config) (*DynamicColumns, error) {
}, nil }, nil
} }
func hasGet(methods []string) bool {
for _, method := range methods {
if method == http.MethodGet {
return true
}
}
return false
}
func (d *DynamicColumns) SetColumns(schema *types.APISchema) error { func (d *DynamicColumns) SetColumns(schema *types.APISchema) error {
if attributes.Columns(schema) != nil { if attributes.Columns(schema) != nil {
return nil return nil
@ -46,11 +43,6 @@ func (d *DynamicColumns) SetColumns(schema *types.APISchema) error {
if gvr.Resource == "" { if gvr.Resource == "" {
return nil return nil
} }
nsed := attributes.Namespaced(schema)
if !hasGet(schema.CollectionMethods) {
return nil
}
r := d.client.Get() r := d.client.Get()
if gvr.Group == "" { if gvr.Group == "" {
@ -59,33 +51,29 @@ func (d *DynamicColumns) SetColumns(schema *types.APISchema) error {
r.Prefix("apis", gvr.Group) r.Prefix("apis", gvr.Group)
} }
r.Prefix(gvr.Version) r.Prefix(gvr.Version)
if nsed {
r.Prefix("namespaces", "default")
}
r.Prefix(gvr.Resource) r.Prefix(gvr.Resource)
r.VersionedParams(&metav1.ListOptions{
Limit: 1,
}, metav1.ParameterCodec)
obj, err := r.Do().Get() obj, err := r.Do().Get()
if err != nil { if err != nil {
return err return nil
} }
t, ok := obj.(*metav1.Table) t, ok := obj.(*metav1.Table)
if !ok { if !ok {
return nil return nil
} }
var cols []table.Column if len(t.ColumnDefinitions) > 0 {
for _, cd := range t.ColumnDefinitions { var cols []ColumnDefinition
cols = append(cols, table.Column{ for i, colDef := range t.ColumnDefinitions {
Name: cd.Name, cols = append(cols, ColumnDefinition{
Field: "metadata.computed.fields." + cd.Name, TableColumnDefinition: colDef,
Type: cd.Type, Field: fmt.Sprintf("$.metadata.fields[%d]", i),
Format: cd.Format, })
}) }
}
if len(cols) > 0 {
attributes.SetColumns(schema, cols) attributes.SetColumns(schema, cols)
schema.Attributes["server-side-column"] = "true"
} }
return nil return nil
@ -93,6 +81,9 @@ func (d *DynamicColumns) SetColumns(schema *types.APISchema) error {
func newClient(config *rest.Config) (*rest.RESTClient, error) { func newClient(config *rest.Config) (*rest.RESTClient, error) {
scheme := runtime.NewScheme() scheme := runtime.NewScheme()
if err := internalversion.AddToScheme(scheme); err != nil {
return nil, err
}
if err := metav1.AddMetaToScheme(scheme); err != nil { if err := metav1.AddMetaToScheme(scheme); err != nil {
return nil, err return nil, err
} }
@ -101,9 +92,9 @@ func newClient(config *rest.Config) (*rest.RESTClient, error) {
} }
config = rest.CopyConfig(config) config = rest.CopyConfig(config)
config.RateLimiter = ratelimit.None
config.UserAgent = rest.DefaultKubernetesUserAgent() config.UserAgent = rest.DefaultKubernetesUserAgent()
config.AcceptContentTypes = "application/json;as=Table;v=v1beta1;g=meta.k8s.io" config.AcceptContentTypes = "application/json;as=Table;v=v1;g=meta.k8s.io"
config.ContentType = "application/json;as=Table;v=v1beta1;g=meta.k8s.io"
config.GroupVersion = &schema.GroupVersion{} config.GroupVersion = &schema.GroupVersion{}
config.NegotiatedSerializer = serializer.NewCodecFactory(scheme) config.NegotiatedSerializer = serializer.NewCodecFactory(scheme)
config.APIPath = "/" config.APIPath = "/"

View File

@ -11,7 +11,6 @@ func DefaultTemplate(clientGetter proxy.ClientGetter) schema.Template {
return schema.Template{ return schema.Template{
Store: proxy.NewProxyStore(clientGetter), Store: proxy.NewProxyStore(clientGetter),
Formatter: Formatter, Formatter: Formatter,
Mapper: &DefaultColumns{},
} }
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/rancher/steve/pkg/schemaserver/types" "github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/steve/pkg/server/handler" "github.com/rancher/steve/pkg/server/handler"
"github.com/rancher/steve/pkg/server/resources" "github.com/rancher/steve/pkg/server/resources"
"github.com/rancher/steve/pkg/server/resources/common"
) )
var ErrConfigRequired = errors.New("rest config is required") var ErrConfigRequired = errors.New("rest config is required")
@ -52,18 +53,24 @@ func setup(ctx context.Context, server *Server) (http.Handler, *schema.Collectio
return nil, nil, err return nil, nil, err
} }
ccache := clustercache.NewClusterCache(ctx, cf.DynamicClient()) ccache := clustercache.NewClusterCache(ctx, cf.MetadataClient())
server.BaseSchemas = resources.DefaultSchemas(server.BaseSchemas, server.K8s.Discovery(), ccache) server.BaseSchemas = resources.DefaultSchemas(server.BaseSchemas, server.K8s.Discovery(), ccache)
server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf)...) server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf)...)
asl := server.AccessSetLookup asl := server.AccessSetLookup
if asl == nil { if asl == nil {
asl = accesscontrol.NewAccessStore(server.RBAC) asl = accesscontrol.NewAccessStore(ctx, true, server.RBAC)
} }
sf := schema.NewCollection(server.BaseSchemas, asl) cols, err := common.NewDynamicColumns(server.RestConfig)
if err != nil {
return nil, nil, err
}
sf := schema.NewCollection(ctx, server.BaseSchemas, asl)
sync := schemacontroller.Register(ctx, sync := schemacontroller.Register(ctx,
cols,
server.K8s.Discovery(), server.K8s.Discovery(),
server.CRD.CustomResourceDefinition(), server.CRD.CustomResourceDefinition(),
server.API.APIService(), server.API.APIService(),

View File

@ -1,66 +0,0 @@
package proxy
import (
"fmt"
"github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/schemaserver/httperror"
"github.com/rancher/steve/pkg/schemaserver/types"
"github.com/rancher/wrangler/pkg/schemas/validation"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
type ClientFactory struct {
cfg rest.Config
client dynamic.Interface
impersonate bool
idToGVR map[string]schema.GroupVersionResource
}
func NewClientFactory(cfg *rest.Config, impersonate bool) *ClientFactory {
return &ClientFactory{
impersonate: impersonate,
cfg: *cfg,
idToGVR: map[string]schema.GroupVersionResource{},
}
}
func (p *ClientFactory) Client(ctx *types.APIRequest, schema *types.APISchema) (dynamic.ResourceInterface, error) {
gvr := attributes.GVR(schema)
if gvr.Resource == "" {
return nil, httperror.NewAPIError(validation.NotFound, "Failed to find gvr for "+schema.ID)
}
user, ok := request.UserFrom(ctx.Request.Context())
if !ok {
return nil, fmt.Errorf("failed to find user context for client")
}
client, err := p.getClient(user)
if err != nil {
return nil, err
}
return client.Resource(gvr), nil
}
func (p *ClientFactory) getClient(user user.Info) (dynamic.Interface, error) {
if p.impersonate {
return p.client, nil
}
if user.GetName() == "" {
return nil, fmt.Errorf("failed to determine current user")
}
newCfg := p.cfg
newCfg.Impersonate.UserName = user.GetName()
newCfg.Impersonate.Groups = user.GetGroups()
newCfg.Impersonate.Extra = user.GetExtra()
return dynamic.NewForConfig(&newCfg)
}

View File

@ -96,7 +96,9 @@ func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, id string
return nil, err return nil, err
} }
return k8sClient.Get(id, opts) obj, err := k8sClient.Get(id, opts)
rowToObject(obj)
return obj, err
} }
func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} { func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} {
@ -130,6 +132,51 @@ func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured
return obj return obj
} }
func rowToObject(obj *unstructured.Unstructured) {
if obj.Object["kind"] != "Table" ||
obj.Object["apiVersion"] != "meta.k8s.io/v1" {
return
}
items := tableToObjects(obj.Object)
if len(items) == 1 {
obj.Object = items[0].Object
}
}
func tableToList(obj *unstructured.UnstructuredList) {
if obj.Object["kind"] != "Table" ||
obj.Object["apiVersion"] != "meta.k8s.io/v1" {
return
}
obj.Items = tableToObjects(obj.Object)
}
func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured {
var result []unstructured.Unstructured
rows, _ := obj["rows"].([]interface{})
for _, row := range rows {
m, ok := row.(map[string]interface{})
if !ok {
continue
}
cells := m["cells"]
object, ok := m["object"].(map[string]interface{})
if !ok {
continue
}
data.PutValue(object, cells, "metadata", "fields")
result = append(result, unstructured.Unstructured{
Object: object,
})
}
return result
}
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
k8sClient, err := s.clientGetter.Client(apiOp, schema, apiOp.Namespace) k8sClient, err := s.clientGetter.Client(apiOp, schema, apiOp.Namespace)
if err != nil { if err != nil {
@ -146,6 +193,8 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
return types.APIObjectList{}, err return types.APIObjectList{}, err
} }
tableToList(resultList)
result := types.APIObjectList{ result := types.APIObjectList{
Revision: resultList.GetResourceVersion(), Revision: resultList.GetResourceVersion(),
Continue: resultList.GetContinue(), Continue: resultList.GetContinue(),
@ -230,6 +279,10 @@ func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et
name = types.CreateAPIEvent name = types.CreateAPIEvent
} }
if unstr, ok := obj.(*unstructured.Unstructured); ok {
rowToObject(unstr)
}
event := types.APIEvent{ event := types.APIEvent{
Name: name, Name: name,
Object: toAPI(schema, obj), Object: toAPI(schema, obj),