diff --git a/go.mod b/go.mod index 6460650..b00eae1 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/pborman/uuid v1.2.0 github.com/pkg/errors v0.9.1 - github.com/rancher/apiserver v0.0.0-20210727155917-6a723678dd3d + github.com/rancher/apiserver v0.0.0-20210818221223-fb33444dfae8 github.com/rancher/dynamiclistener v0.2.1-0.20200714201033-9c1939da3af9 github.com/rancher/kubernetes-provider-detector v0.1.2 github.com/rancher/norman v0.0.0-20210423002317-8e6ffc77a819 diff --git a/go.sum b/go.sum index c18e0eb..9974d78 100644 --- a/go.sum +++ b/go.sum @@ -444,8 +444,8 @@ github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULU github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/qri-io/starlib v0.4.2-0.20200213133954-ff2e8cd5ef8d/go.mod h1:7DPO4domFU579Ga6E61sB9VFNaniPVwJP5C4bBCu3wA= -github.com/rancher/apiserver v0.0.0-20210727155917-6a723678dd3d h1:mRiUiDgpF+b6QCI9rJpanYNCnZ5VJgPnD5HNtj/bX+Y= -github.com/rancher/apiserver v0.0.0-20210727155917-6a723678dd3d/go.mod h1:8W0EwaR9dH5NDFw6mpAX437D0q+EZqKWbZyX71+z2WI= +github.com/rancher/apiserver v0.0.0-20210818221223-fb33444dfae8 h1:Lg2urAlvMUO+sH8tFQMZzsgn+wP5hVjkVghVQtobqow= +github.com/rancher/apiserver v0.0.0-20210818221223-fb33444dfae8/go.mod h1:8W0EwaR9dH5NDFw6mpAX437D0q+EZqKWbZyX71+z2WI= github.com/rancher/client-go v1.20.0-rancher.1 h1:B85UDTIx+0XgOyv0obL9HJSNdY3mNBi1+wm26TOQZ8o= github.com/rancher/client-go v1.20.0-rancher.1/go.mod h1:UTdyXFcu9VZV4qQRKGXCa0KdMX4HTCXClRs4s7yFdDQ= github.com/rancher/dynamiclistener v0.2.1-0.20200714201033-9c1939da3af9 h1:Mo5mPXi7k/TgzMcUIuDpbNxiX2bYh68+yEpaur5Nx80= diff --git a/pkg/aggregation/server.go b/pkg/aggregation/server.go index 9d39c3d..b0b3a74 100644 --- a/pkg/aggregation/server.go +++ b/pkg/aggregation/server.go @@ -60,12 +60,24 @@ func ListenAndServe(ctx context.Context, url string, caCert []byte, token string func serve(ctx context.Context, dialer websocket.Dialer, url string, headers http.Header, handler http.Handler) error { url = strings.Replace(url, "http://", "ws://", 1) url = strings.Replace(url, "https://", "wss://", 1) - conn, _, err := dialer.DialContext(ctx, url, headers) + + // ensure we clean up everything on exit + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + dialCtx, dialCancel := context.WithTimeout(ctx, 5*time.Second) + defer dialCancel() + conn, _, err := dialer.DialContext(dialCtx, url, headers) if err != nil { return err } defer conn.Close() + go func() { + <-ctx.Done() + conn.Close() + }() + listener := NewListener("steve") server := http.Server{ Handler: handler, diff --git a/pkg/aggregation/watch.go b/pkg/aggregation/watch.go index 5f42400..263b65d 100644 --- a/pkg/aggregation/watch.go +++ b/pkg/aggregation/watch.go @@ -85,7 +85,7 @@ func (h *handler) shouldRestart(secret *corev1.Secret) (string, []byte, string, if h.url != url || h.token != token || - bytes.Equal(h.caCert, caCert) { + !bytes.Equal(h.caCert, caCert) { return url, caCert, token, true, nil } diff --git a/pkg/attributes/attributes.go b/pkg/attributes/attributes.go index f07034b..81be0b3 100644 --- a/pkg/attributes/attributes.go +++ b/pkg/attributes/attributes.go @@ -127,6 +127,25 @@ func Access(s *types.APISchema) interface{} { return s.Attributes["access"] } +func AddDisallowMethods(s *types.APISchema, methods ...string) { + data, ok := s.Attributes["disallowMethods"].(map[string]bool) + if !ok { + data = map[string]bool{} + s.Attributes["disallowMethods"] = data + } + for _, method := range methods { + data[method] = true + } +} + +func DisallowMethods(s *types.APISchema) map[string]bool { + data, ok := s.Attributes["disallowMethods"].(map[string]bool) + if !ok { + return nil + } + return data +} + func SetAPIResource(s *types.APISchema, resource v1.APIResource) { SetResource(s, resource.Name) SetVerbs(s, resource.Verbs) diff --git a/pkg/resources/common/formatter.go b/pkg/resources/common/formatter.go index 1ffdc32..7ea90b2 100644 --- a/pkg/resources/common/formatter.go +++ b/pkg/resources/common/formatter.go @@ -73,6 +73,14 @@ func formatter(summarycache *summarycache.SummaryCache) types.Formatter { resource.Links["update"] = u } + if _, ok := resource.Links["update"]; !ok && slice.ContainsString(resource.Schema.ResourceMethods, "blocked-PUT") { + resource.Links["update"] = "blocked" + } + + if _, ok := resource.Links["remove"]; !ok && slice.ContainsString(resource.Schema.ResourceMethods, "blocked-DELETE") { + resource.Links["remove"] = "blocked" + } + if unstr, ok := resource.APIObject.Object.(*unstructured.Unstructured); ok { s, rel := summarycache.SummaryAndRelationship(unstr) data.PutValue(unstr.Object, map[string]interface{}{ diff --git a/pkg/resources/schema.go b/pkg/resources/schema.go index 40fbdf4..ff3fee1 100644 --- a/pkg/resources/schema.go +++ b/pkg/resources/schema.go @@ -19,13 +19,23 @@ import ( steveschema "github.com/rancher/steve/pkg/schema" "github.com/rancher/steve/pkg/stores/proxy" "github.com/rancher/steve/pkg/summarycache" + "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/discovery" ) func DefaultSchemas(ctx context.Context, baseSchema *types.APISchemas, ccache clustercache.ClusterCache, - cg proxy.ClientGetter, schemaFactory steveschema.Factory) error { + cg proxy.ClientGetter, schemaFactory steveschema.Factory, serverVersion string) error { counts.Register(baseSchema, ccache) - subscribe.Register(baseSchema) + subscribe.Register(baseSchema, func(apiOp *types.APIRequest) *types.APISchemas { + user, ok := request.UserFrom(apiOp.Context()) + if ok { + schemas, err := schemaFactory.Schemas(user) + if err == nil { + return schemas + } + } + return apiOp.Schemas + }, serverVersion) apiroot.Register(baseSchema, []string{"v1"}, "proxy:/apis") cluster.Register(ctx, baseSchema, cg, schemaFactory) userpreferences.Register(baseSchema) diff --git a/pkg/resources/schemas/template.go b/pkg/resources/schemas/template.go index b683aad..50b7bb8 100644 --- a/pkg/resources/schemas/template.go +++ b/pkg/resources/schemas/template.go @@ -6,6 +6,7 @@ import ( "time" "github.com/rancher/apiserver/pkg/builtin" + "k8s.io/apimachinery/pkg/api/equality" schemastore "github.com/rancher/apiserver/pkg/store/schema" "github.com/rancher/apiserver/pkg/types" @@ -96,12 +97,24 @@ func (s *Store) sendSchemas(result chan types.APIEvent, apiOp *types.APIRequest, inNewSchemas := map[string]bool{} for _, apiObject := range schemastore.FilterSchemas(apiOp, schemas.Schemas).Objects { + inNewSchemas[apiObject.ID] = true + eventName := types.ChangeAPIEvent + if oldSchema := oldSchemas.LookupSchema(apiObject.ID); oldSchema == nil { + eventName = types.CreateAPIEvent + } else { + newSchemaCopy := apiObject.Object.(*types.APISchema).Schema.DeepCopy() + oldSchemaCopy := oldSchema.Schema.DeepCopy() + newSchemaCopy.Mapper = nil + oldSchemaCopy.Mapper = nil + if equality.Semantic.DeepEqual(newSchemaCopy, oldSchemaCopy) { + continue + } + } result <- types.APIEvent{ - Name: types.ChangeAPIEvent, + Name: eventName, ResourceType: "schema", Object: apiObject, } - inNewSchemas[apiObject.ID] = true } for _, oldSchema := range schemastore.FilterSchemas(apiOp, oldSchemas.Schemas).Objects { diff --git a/pkg/schema/factory.go b/pkg/schema/factory.go index 3b9f150..b16af18 100644 --- a/pkg/schema/factory.go +++ b/pkg/schema/factory.go @@ -99,21 +99,28 @@ func (c *Collection) schemasForSubject(access *accesscontrol.AccessSet) (*types. } } + allowed := func(method string) string { + if attributes.DisallowMethods(s)[method] { + return "blocked-" + method + } + return method + } + s = s.DeepCopy() attributes.SetAccess(s, verbAccess) if verbAccess.AnyVerb("list", "get") { - s.ResourceMethods = append(s.ResourceMethods, http.MethodGet) - s.CollectionMethods = append(s.CollectionMethods, http.MethodGet) + s.ResourceMethods = append(s.ResourceMethods, allowed(http.MethodGet)) + s.CollectionMethods = append(s.CollectionMethods, allowed(http.MethodGet)) } if verbAccess.AnyVerb("delete") { - s.ResourceMethods = append(s.ResourceMethods, http.MethodDelete) + s.ResourceMethods = append(s.ResourceMethods, allowed(http.MethodDelete)) } if verbAccess.AnyVerb("update") { - s.ResourceMethods = append(s.ResourceMethods, http.MethodPut) - s.ResourceMethods = append(s.ResourceMethods, http.MethodPatch) + s.ResourceMethods = append(s.ResourceMethods, allowed(http.MethodPut)) + s.ResourceMethods = append(s.ResourceMethods, allowed(http.MethodPatch)) } if verbAccess.AnyVerb("create") { - s.CollectionMethods = append(s.CollectionMethods, http.MethodPost) + s.CollectionMethods = append(s.CollectionMethods, allowed(http.MethodPost)) } if len(s.CollectionMethods) == 0 && len(s.ResourceMethods) == 0 { diff --git a/pkg/server/server.go b/pkg/server/server.go index e8615d8..1a54a9f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -37,6 +37,7 @@ type Server struct { AccessSetLookup accesscontrol.AccessSetLookup APIServer *apiserver.Server ClusterRegistry string + Version string authMiddleware auth.Middleware controllers *Controllers @@ -59,6 +60,7 @@ type Options struct { AggregationSecretNamespace string AggregationSecretName string ClusterRegistry string + ServerVersion string } func New(ctx context.Context, restConfig *rest.Config, opts *Options) (*Server, error) { @@ -77,6 +79,7 @@ func New(ctx context.Context, restConfig *rest.Config, opts *Options) (*Server, aggregationSecretNamespace: opts.AggregationSecretNamespace, aggregationSecretName: opts.AggregationSecretName, ClusterRegistry: opts.ClusterRegistry, + Version: opts.ServerVersion, } if err := setup(ctx, server); err != nil { @@ -135,7 +138,7 @@ func setup(ctx context.Context, server *Server) error { server.ClusterCache = ccache sf := schema.NewCollection(ctx, server.BaseSchemas, asl) - if err = resources.DefaultSchemas(ctx, server.BaseSchemas, ccache, server.ClientFactory, sf); err != nil { + if err = resources.DefaultSchemas(ctx, server.BaseSchemas, ccache, server.ClientFactory, sf, server.Version); err != nil { return err } diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index 8be0bb3..80e8aa5 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -282,21 +282,8 @@ func returnErr(err error, c chan types.APIEvent) { func (s *Store) listAndWatch(apiOp *types.APIRequest, k8sClient dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan types.APIEvent) { rev := w.Revision - if rev == "-1" { + if rev == "-1" || rev == "0" { rev = "" - } else { - // ensure the revision is valid or get the latest one - list, err := k8sClient.List(apiOp.Context(), metav1.ListOptions{ - Limit: 1, - ResourceVersion: rev, - }) - if err != nil { - returnErr(errors.Wrapf(err, "failed to list %s", schema.ID), result) - return - } - if rev == "" { - rev = list.GetResourceVersion() - } } timeout := int64(60 * 30) diff --git a/pkg/stores/proxy/watch_refresh.go b/pkg/stores/proxy/watch_refresh.go index 87ecda0..7674a16 100644 --- a/pkg/stores/proxy/watch_refresh.go +++ b/pkg/stores/proxy/watch_refresh.go @@ -29,7 +29,7 @@ func (w *WatchRefresh) Watch(apiOp *types.APIRequest, schema *types.APISchema, w select { case <-ctx.Done(): return - case <-time.After(30 * time.Second): + case <-time.After(2 * time.Second): } newAs := w.asl.AccessFor(user)