mirror of
https://github.com/niusmallnan/steve.git
synced 2026-01-29 21:38:35 +00:00
Drop non-preferred version and version in the schema id
This commit is contained in:
@@ -5,7 +5,6 @@ import (
|
||||
"github.com/rancher/steve/pkg/attributes"
|
||||
"github.com/rancher/steve/pkg/schema"
|
||||
"github.com/rancher/steve/pkg/schemaserver/types"
|
||||
runtimeschema "k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
func k8sAPI(sf schema.Factory, apiOp *types.APIRequest) {
|
||||
@@ -16,11 +15,7 @@ func k8sAPI(sf schema.Factory, apiOp *types.APIRequest) {
|
||||
}
|
||||
|
||||
apiOp.Name = vars["name"]
|
||||
apiOp.Type = sf.ByGVR(runtimeschema.GroupVersionResource{
|
||||
Version: vars["version"],
|
||||
Group: group,
|
||||
Resource: vars["resource"],
|
||||
})
|
||||
apiOp.Type = vars["type"]
|
||||
|
||||
nOrN := vars["nameorns"]
|
||||
if nOrN != "" {
|
||||
|
||||
@@ -3,21 +3,26 @@ package apigroups
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/rancher/steve/pkg/schema"
|
||||
|
||||
"github.com/rancher/steve/pkg/schemaserver/store/empty"
|
||||
"github.com/rancher/steve/pkg/schemaserver/types"
|
||||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/discovery"
|
||||
)
|
||||
|
||||
func Register(schemas *types.APISchemas, discovery discovery.DiscoveryInterface) {
|
||||
schemas.MustImportAndCustomize(v1.APIGroup{}, func(schema *types.APISchema) {
|
||||
schema.CollectionMethods = []string{http.MethodGet}
|
||||
schema.ResourceMethods = []string{http.MethodGet}
|
||||
schema.Store = NewStore(discovery)
|
||||
schema.Formatter = func(request *types.APIRequest, resource *types.RawResource) {
|
||||
func Template(discovery discovery.DiscoveryInterface) schema.Template {
|
||||
return schema.Template{
|
||||
ID: "apigroup",
|
||||
Customize: func(apiSchema *types.APISchema) {
|
||||
apiSchema.CollectionMethods = []string{http.MethodGet}
|
||||
apiSchema.ResourceMethods = []string{http.MethodGet}
|
||||
},
|
||||
Formatter: func(request *types.APIRequest, resource *types.RawResource) {
|
||||
resource.ID = resource.APIObject.Data().String("name")
|
||||
}
|
||||
})
|
||||
},
|
||||
Store: NewStore(discovery),
|
||||
}
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
|
||||
@@ -58,6 +58,7 @@ func (d *DynamicColumns) SetColumns(schema *types.APISchema) error {
|
||||
|
||||
obj, err := r.Do().Get()
|
||||
if err != nil {
|
||||
attributes.SetTable(schema, false)
|
||||
return nil
|
||||
}
|
||||
t, ok := obj.(*metav1.Table)
|
||||
|
||||
@@ -14,16 +14,16 @@ import (
|
||||
"k8s.io/client-go/discovery"
|
||||
)
|
||||
|
||||
func DefaultSchemas(baseSchema *types.APISchemas, discovery discovery.DiscoveryInterface, ccache clustercache.ClusterCache) *types.APISchemas {
|
||||
func DefaultSchemas(baseSchema *types.APISchemas, ccache clustercache.ClusterCache) *types.APISchemas {
|
||||
counts.Register(baseSchema, ccache)
|
||||
subscribe.Register(baseSchema)
|
||||
apigroups.Register(baseSchema, discovery)
|
||||
apiroot.Register(baseSchema, []string{"v1"}, []string{"proxy:/apis"})
|
||||
return baseSchema
|
||||
}
|
||||
|
||||
func DefaultSchemaTemplates(cf *client.Factory, lookup accesscontrol.AccessSetLookup) []schema.Template {
|
||||
func DefaultSchemaTemplates(cf *client.Factory, lookup accesscontrol.AccessSetLookup, discovery discovery.DiscoveryInterface) []schema.Template {
|
||||
return []schema.Template{
|
||||
common.DefaultTemplate(cf, lookup),
|
||||
apigroups.Template(discovery),
|
||||
}
|
||||
}
|
||||
|
||||
158
pkg/server/resources/schemas/template.go
Normal file
158
pkg/server/resources/schemas/template.go
Normal file
@@ -0,0 +1,158 @@
|
||||
package schemas
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/steve/pkg/schemaserver/builtin"
|
||||
|
||||
"github.com/rancher/steve/pkg/accesscontrol"
|
||||
"github.com/rancher/steve/pkg/schema"
|
||||
schemastore "github.com/rancher/steve/pkg/schemaserver/store/schema"
|
||||
"github.com/rancher/steve/pkg/schemaserver/types"
|
||||
"github.com/rancher/wrangler/pkg/broadcast"
|
||||
"github.com/rancher/wrangler/pkg/schemas/validation"
|
||||
"github.com/sirupsen/logrus"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
func SetupWatcher(ctx context.Context, schemas *types.APISchemas, asl accesscontrol.AccessSetLookup, factory schema.Factory) {
|
||||
// one instance shared with all stores
|
||||
notifier := schemaChangeNotifier(ctx, factory)
|
||||
|
||||
schema := builtin.Schema
|
||||
schema.Store = &Store{
|
||||
Store: schema.Store,
|
||||
asl: asl,
|
||||
sf: factory,
|
||||
schemaChangeNotify: notifier,
|
||||
}
|
||||
|
||||
schemas.AddSchema(schema)
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
types.Store
|
||||
|
||||
asl accesscontrol.AccessSetLookup
|
||||
sf schema.Factory
|
||||
schemaChangeNotify func(context.Context) (chan interface{}, error)
|
||||
}
|
||||
|
||||
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
|
||||
user, ok := request.UserFrom(apiOp.Request.Context())
|
||||
if !ok {
|
||||
return nil, validation.Unauthorized
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
result := make(chan types.APIEvent)
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(result)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c, err := s.schemaChangeNotify(apiOp.Context())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
schemas, err := s.sf.Schemas(user)
|
||||
if err != nil {
|
||||
logrus.Errorf("failed to generate schemas for user %v: %v", user, err)
|
||||
return
|
||||
}
|
||||
for range c {
|
||||
schemas = s.sendSchemas(result, apiOp, user, schemas)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
schemas, err := s.sf.Schemas(user)
|
||||
if err != nil {
|
||||
logrus.Errorf("failed to generate schemas for notify user %v: %v", user, err)
|
||||
return
|
||||
}
|
||||
for range s.userChangeNotify(apiOp.Context(), user) {
|
||||
schemas = s.sendSchemas(result, apiOp, user, schemas)
|
||||
}
|
||||
}()
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Store) sendSchemas(result chan types.APIEvent, apiOp *types.APIRequest, user user.Info, oldSchemas *types.APISchemas) *types.APISchemas {
|
||||
schemas, err := s.sf.Schemas(user)
|
||||
if err != nil {
|
||||
logrus.Errorf("failed to get schemas for %v: %v", user, err)
|
||||
return oldSchemas
|
||||
}
|
||||
|
||||
for _, apiObject := range schemastore.FilterSchemas(apiOp, schemas.Schemas).Objects {
|
||||
result <- types.APIEvent{
|
||||
Name: types.ChangeAPIEvent,
|
||||
ResourceType: "schema",
|
||||
Object: apiObject,
|
||||
}
|
||||
delete(oldSchemas.Schemas, apiObject.ID)
|
||||
}
|
||||
|
||||
for id, oldSchema := range oldSchemas.Schemas {
|
||||
result <- types.APIEvent{
|
||||
Name: types.ChangeAPIEvent,
|
||||
ResourceType: "schema",
|
||||
Object: types.APIObject{
|
||||
Type: "schema",
|
||||
ID: id,
|
||||
Object: oldSchema,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return schemas
|
||||
}
|
||||
|
||||
func (s *Store) userChangeNotify(ctx context.Context, user user.Info) chan interface{} {
|
||||
as := s.asl.AccessFor(user)
|
||||
result := make(chan interface{})
|
||||
go func() {
|
||||
defer close(result)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
|
||||
newAS := s.asl.AccessFor(user)
|
||||
if newAS.ID != as.ID {
|
||||
result <- struct{}{}
|
||||
as = newAS
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func schemaChangeNotifier(ctx context.Context, factory schema.Factory) func(ctx context.Context) (chan interface{}, error) {
|
||||
notify := make(chan interface{})
|
||||
bcast := &broadcast.Broadcaster{}
|
||||
factory.OnChange(ctx, func() {
|
||||
select {
|
||||
case notify <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
})
|
||||
return func(ctx context.Context) (chan interface{}, error) {
|
||||
return bcast.Subscribe(ctx, func() (chan interface{}, error) {
|
||||
return notify, nil
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -26,14 +26,11 @@ func Routes(h Handlers) http.Handler {
|
||||
m.Path("/").Handler(h.APIRoot).HeadersRegexp("Accepts", ".*json.*")
|
||||
m.Path("/{name:v1}").Handler(h.APIRoot)
|
||||
|
||||
m.Path("/v1/{group}.{version}.{resource}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{group}.{version}.{resource}/{nameorns}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{group}.{version}.{resource}/{namespace}/{name}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{group}.{version}.{resource}/{nameorns}").Queries("action", "{action}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{group}.{version}.{resource}/{namespace}/{name}").Queries("action", "{action}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{type:schemas}/{name:.*}").Handler(h.GenericResource)
|
||||
m.Path("/v1/{type}").Handler(h.GenericResource)
|
||||
m.Path("/v1/{type}/{name}").Handler(h.GenericResource)
|
||||
m.Path("/v1/{type}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{type}/{nameorns}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{type}/{namespace}/{name}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{type}/{nameorns}").Queries("action", "{action}").Handler(h.K8sResource)
|
||||
m.Path("/v1/{type}/{namespace}/{name}").Queries("action", "{action}").Handler(h.K8sResource)
|
||||
m.Path("/api").Handler(h.K8sProxy) // Can't just prefix this as UI needs /apikeys path
|
||||
m.PathPrefix("/api/").Handler(h.K8sProxy)
|
||||
m.PathPrefix("/apis").Handler(h.K8sProxy)
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/rancher/steve/pkg/server/resources/schemas"
|
||||
|
||||
"github.com/rancher/dynamiclistener/server"
|
||||
"github.com/rancher/steve/pkg/accesscontrol"
|
||||
"github.com/rancher/steve/pkg/client"
|
||||
@@ -66,8 +68,8 @@ func setup(ctx context.Context, server *Server) (http.Handler, *schema.Collectio
|
||||
|
||||
ccache := clustercache.NewClusterCache(ctx, cf.MetadataClient())
|
||||
|
||||
server.BaseSchemas = resources.DefaultSchemas(server.BaseSchemas, server.K8s.Discovery(), ccache)
|
||||
server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, asl)...)
|
||||
server.BaseSchemas = resources.DefaultSchemas(server.BaseSchemas, ccache)
|
||||
server.SchemaTemplates = append(server.SchemaTemplates, resources.DefaultSchemaTemplates(cf, asl, server.K8s.Discovery())...)
|
||||
|
||||
cols, err := common.NewDynamicColumns(server.RestConfig)
|
||||
if err != nil {
|
||||
@@ -75,6 +77,9 @@ func setup(ctx context.Context, server *Server) (http.Handler, *schema.Collectio
|
||||
}
|
||||
|
||||
sf := schema.NewCollection(ctx, server.BaseSchemas, asl)
|
||||
|
||||
schemas.SetupWatcher(ctx, server.BaseSchemas, asl, sf)
|
||||
|
||||
sync := schemacontroller.Register(ctx,
|
||||
cols,
|
||||
server.K8s.Discovery(),
|
||||
|
||||
Reference in New Issue
Block a user