1
0
mirror of https://github.com/rancher/steve.git synced 2025-06-22 13:07:27 +00:00

Merge pull request #60 from cmurphy/cleanup

Cleanup and docs
This commit is contained in:
Colleen Murphy 2022-10-31 11:25:08 -07:00 committed by GitHub
commit a10fe811f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 120 additions and 138 deletions

View File

@ -1,7 +1,7 @@
package accesscontrol package accesscontrol
import ( import (
"github.com/rancher/apiserver/pkg/server" apiserver "github.com/rancher/apiserver/pkg/server"
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/attributes"
"github.com/rancher/wrangler/pkg/kv" "github.com/rancher/wrangler/pkg/kv"
@ -9,7 +9,7 @@ import (
) )
type AccessControl struct { type AccessControl struct {
server.SchemaBasedAccess apiserver.SchemaBasedAccess
} }
func NewAccessControl() *AccessControl { func NewAccessControl() *AccessControl {

View File

@ -5,7 +5,6 @@ import (
"net/url" "net/url"
"strings" "strings"
"github.com/rancher/wrangler/pkg/kubeconfig"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
@ -14,17 +13,6 @@ import (
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
) )
// Mostly copied from "kubectl proxy" code
func HandlerFromConfig(prefix, kubeConfig string) (http.Handler, error) {
loader := kubeconfig.GetInteractiveClientConfig(kubeConfig)
cfg, err := loader.ClientConfig()
if err != nil {
return nil, err
}
return Handler(prefix, cfg)
}
func ImpersonatingHandler(prefix string, cfg *rest.Config) http.Handler { func ImpersonatingHandler(prefix string, cfg *rest.Config) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
impersonate(rw, req, prefix, cfg) impersonate(rw, req, prefix, cfg)

View File

@ -6,7 +6,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/rancher/apiserver/pkg/server" apiserver "github.com/rancher/apiserver/pkg/server"
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/attributes"
@ -55,7 +55,7 @@ type Template struct {
StoreFactory func(types.Store) types.Store StoreFactory func(types.Store) types.Store
} }
func WrapServer(factory Factory, server *server.Server) http.Handler { func WrapServer(factory Factory, server *apiserver.Server) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
user, ok := request.UserFrom(req.Context()) user, ok := request.UserFrom(req.Context())
if !ok { if !ok {

View File

@ -3,7 +3,6 @@ package handler
import ( import (
"net/http" "net/http"
"github.com/rancher/apiserver/pkg/server"
apiserver "github.com/rancher/apiserver/pkg/server" apiserver "github.com/rancher/apiserver/pkg/server"
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/apiserver/pkg/urlbuilder" "github.com/rancher/apiserver/pkg/urlbuilder"
@ -26,7 +25,7 @@ func New(cfg *rest.Config, sf schema.Factory, authMiddleware auth.Middleware, ne
a := &apiServer{ a := &apiServer{
sf: sf, sf: sf,
server: server.DefaultAPIServer(), server: apiserver.DefaultAPIServer(),
} }
a.server.AccessControl = accesscontrol.NewAccessControl() a.server.AccessControl = accesscontrol.NewAccessControl()
@ -55,7 +54,7 @@ func New(cfg *rest.Config, sf schema.Factory, authMiddleware auth.Middleware, ne
type apiServer struct { type apiServer struct {
sf schema.Factory sf schema.Factory
server *server.Server server *apiserver.Server
} }
func (a *apiServer) common(rw http.ResponseWriter, req *http.Request) (*types.APIRequest, bool) { func (a *apiServer) common(rw http.ResponseWriter, req *http.Request) (*types.APIRequest, bool) {

View File

@ -22,7 +22,7 @@ func Routes(h Handlers) http.Handler {
m.StrictSlash(true) m.StrictSlash(true)
m.Use(urlbuilder.RedirectRewrite) m.Use(urlbuilder.RedirectRewrite)
m.Path("/").Handler(h.APIRoot).HeadersRegexp("Accepts", ".*json.*") m.Path("/").Handler(h.APIRoot).HeadersRegexp("Accept", ".*json.*")
m.Path("/{name:v1}").Handler(h.APIRoot) m.Path("/{name:v1}").Handler(h.APIRoot)
m.Path("/v1/{type}").Handler(h.K8sResource) m.Path("/v1/{type}").Handler(h.K8sResource)

View File

@ -10,29 +10,42 @@ import (
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
) )
// Partition represents a named grouping of kubernetes resources,
// such as by namespace or a set of names.
type Partition interface { type Partition interface {
Name() string Name() string
} }
// ParallelPartitionLister defines how a set of partitions will be queried.
type ParallelPartitionLister struct { type ParallelPartitionLister struct {
// Lister is the lister method for a single partition.
Lister PartitionLister Lister PartitionLister
// Concurrency is the weight of the semaphore.
Concurrency int64 Concurrency int64
// Partitions is the set of partitions that will be concurrently queried.
Partitions []Partition Partitions []Partition
state *listState state *listState
revision string revision string
err error err error
} }
// PartitionLister lists objects for one partition.
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error)
// Err returns the latest error encountered.
func (p *ParallelPartitionLister) Err() error { func (p *ParallelPartitionLister) Err() error {
return p.err return p.err
} }
// Revision returns the revision for the current list state.
func (p *ParallelPartitionLister) Revision() string { func (p *ParallelPartitionLister) Revision() string {
return p.revision return p.revision
} }
// Continue returns the encoded continue token based on the current list state.
func (p *ParallelPartitionLister) Continue() string { func (p *ParallelPartitionLister) Continue() string {
if p.state == nil { if p.state == nil {
return "" return ""
@ -56,6 +69,9 @@ func indexOrZero(partitions []Partition, name string) int {
return 0 return 0
} }
// List returns a stream of objects up to the requested limit.
// If the continue token is not empty, it decodes it and returns the stream
// starting at the indicated marker.
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) { func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) {
var state listState var state listState
if resume != "" { if resume != "" {
@ -77,14 +93,33 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st
return result, nil return result, nil
} }
// listState is a representation of the continuation point for a partial list.
// It is encoded as the continue token in the returned response.
type listState struct { type listState struct {
// Revision is the resourceVersion for the List object.
Revision string `json:"r,omitempty"` Revision string `json:"r,omitempty"`
// PartitionName is the name of the partition.
PartitionName string `json:"p,omitempty"` PartitionName string `json:"p,omitempty"`
// Continue is the continue token returned from Kubernetes for a partially filled list request.
// It is a subfield of the continue token returned from steve.
Continue string `json:"c,omitempty"` Continue string `json:"c,omitempty"`
// Offset is the offset from the start of the list within the partition to begin the result list.
Offset int `json:"o,omitempty"` Offset int `json:"o,omitempty"`
// Limit is the maximum number of items from all partitions to return in the result.
Limit int `json:"l,omitempty"` Limit int `json:"l,omitempty"`
} }
// feeder spawns a goroutine to list resources in each partition and feeds the
// results, in order by partition index, into a channel.
// If the sum of the results from all partitions (by namespaces or names) is
// greater than the limit parameter from the user request or the default of
// 100000, the result is truncated and a continue token is generated that
// indicates the partition and offset for the client to start on in the next
// request.
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) { func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) {
var ( var (
sem = semaphore.NewWeighted(p.Concurrency) sem = semaphore.NewWeighted(p.Concurrency)
@ -116,6 +151,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
// setup a linked list of channel to control insertion order // setup a linked list of channel to control insertion order
last = next last = next
// state.Revision is decoded from the continue token, there won't be a revision on the first request.
if state.Revision == "" { if state.Revision == "" {
// don't have a revision yet so grab all tickets to set a revision // don't have a revision yet so grab all tickets to set a revision
tickets = 3 tickets = 3
@ -125,7 +161,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
break break
} }
// make state local // make state local for this partition
state := state state := state
eg.Go(func() error { eg.Go(func() error {
defer sem.Release(tickets) defer sem.Release(tickets)
@ -154,10 +190,13 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
p.revision = list.Revision p.revision = list.Revision
} }
// We have already seen the first objects in the list, truncate up to the offset.
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) { if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) {
list.Objects = list.Objects[state.Offset:] list.Objects = list.Objects[state.Offset:]
} }
// Case 1: the capacity has been reached across all goroutines but the list is still only partial,
// so save the state so that the next page can be requested later.
if len(list.Objects) > capacity { if len(list.Objects) > capacity {
result <- list.Objects[:capacity] result <- list.Objects[:capacity]
// save state to redo this list at this offset // save state to redo this list at this offset
@ -173,10 +212,13 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
} }
result <- list.Objects result <- list.Objects
capacity -= len(list.Objects) capacity -= len(list.Objects)
// Case 2: all objects have been returned, we are done.
if list.Continue == "" { if list.Continue == "" {
return nil return nil
} }
// loop again and get more data // Case 3: we started at an offset and truncated the list to skip the objects up to the offset.
// We're not yet up to capacity and have not retrieved every object,
// so loop again and get more data.
state.Continue = list.Continue state.Continue = list.Continue
state.PartitionName = partition.Name() state.PartitionName = partition.Name()
state.Offset = 0 state.Offset = 0

View File

@ -9,12 +9,16 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
const defaultLimit = 100000
// Partitioner is an interface for interacting with partitions.
type Partitioner interface { type Partitioner interface {
Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error)
All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error)
Store(apiOp *types.APIRequest, partition Partition) (types.Store, error) Store(apiOp *types.APIRequest, partition Partition) (types.Store, error)
} }
// Store implements types.Store for partitions.
type Store struct { type Store struct {
Partitioner Partitioner Partitioner Partitioner
} }
@ -28,6 +32,7 @@ func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb,
return s.Partitioner.Store(apiOp, p) return s.Partitioner.Store(apiOp, p)
} }
// Delete deletes an object from a store.
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "delete", id) target, err := s.getStore(apiOp, schema, "delete", id)
if err != nil { if err != nil {
@ -37,6 +42,7 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
return target.Delete(apiOp, schema, id) return target.Delete(apiOp, schema, id)
} }
// ByID looks up a single object by its ID.
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "get", id) target, err := s.getStore(apiOp, schema, "get", id)
if err != nil { if err != nil {
@ -69,12 +75,14 @@ func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, sche
return store.List(req, schema) return store.List(req, schema)
} }
// List returns a list of objects across all applicable partitions.
// If pagination parameters are used, it returns a segment of the list.
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
var ( var (
result types.APIObjectList result types.APIObjectList
) )
paritions, err := s.Partitioner.All(apiOp, schema, "list", "") partitions, err := s.Partitioner.All(apiOp, schema, "list", "")
if err != nil { if err != nil {
return result, err return result, err
} }
@ -84,7 +92,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit) return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
}, },
Concurrency: 3, Concurrency: 3,
Partitions: paritions, Partitions: partitions,
} }
resume := apiOp.Request.URL.Query().Get("continue") resume := apiOp.Request.URL.Query().Get("continue")
@ -104,6 +112,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
return result, lister.Err() return result, lister.Err()
} }
// Create creates a single object in the store.
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "create", "") target, err := s.getStore(apiOp, schema, "create", "")
if err != nil { if err != nil {
@ -113,6 +122,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty
return target.Create(apiOp, schema, data) return target.Create(apiOp, schema, data)
} }
// Update updates a single object in the store.
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "update", id) target, err := s.getStore(apiOp, schema, "update", id)
if err != nil { if err != nil {
@ -122,6 +132,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty
return target.Update(apiOp, schema, data, id) return target.Update(apiOp, schema, data, id)
} }
// Watch returns a channel of events for a list or resource.
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
partitions, err := s.Partitioner.All(apiOp, schema, "watch", wr.ID) partitions, err := s.Partitioner.All(apiOp, schema, "watch", wr.ID)
if err != nil { if err != nil {
@ -164,6 +175,9 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types
return response, nil return response, nil
} }
// getLimit extracts the limit parameter from the request or sets a default of 100000.
// Since a default is always set, this implies that clients must always be
// aware that the list may be incomplete.
func getLimit(req *http.Request) int { func getLimit(req *http.Request) int {
limitString := req.URL.Query().Get("limit") limitString := req.URL.Query().Get("limit")
limit, err := strconv.Atoi(limitString) limit, err := strconv.Atoi(limitString)
@ -171,7 +185,7 @@ func getLimit(req *http.Request) int {
limit = 0 limit = 0
} }
if limit <= 0 { if limit <= 0 {
limit = 100000 limit = defaultLimit
} }
return limit return limit
} }

View File

@ -11,34 +11,38 @@ type errorStore struct {
types.Store types.Store
} }
// ByID looks up a single object by its ID.
func (e *errorStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { func (e *errorStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
data, err := e.Store.ByID(apiOp, schema, id) data, err := e.Store.ByID(apiOp, schema, id)
return data, translateError(err) return data, translateError(err)
} }
// List returns a list of resources.
func (e *errorStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { func (e *errorStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
data, err := e.Store.List(apiOp, schema) data, err := e.Store.List(apiOp, schema)
return data, translateError(err) return data, translateError(err)
} }
// Create creates a single object in the store.
func (e *errorStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { func (e *errorStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
data, err := e.Store.Create(apiOp, schema, data) data, err := e.Store.Create(apiOp, schema, data)
return data, translateError(err) return data, translateError(err)
} }
// Update updates a single object in the store.
func (e *errorStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { func (e *errorStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
data, err := e.Store.Update(apiOp, schema, data, id) data, err := e.Store.Update(apiOp, schema, data, id)
return data, translateError(err) return data, translateError(err)
} }
// Delete deletes an object from a store.
func (e *errorStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { func (e *errorStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
data, err := e.Store.Delete(apiOp, schema, id) data, err := e.Store.Delete(apiOp, schema, id)
return data, translateError(err) return data, translateError(err)
} }
// Watch returns a channel of events for a list or resource.
func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
data, err := e.Store.Watch(apiOp, schema, wr) data, err := e.Store.Watch(apiOp, schema, wr)
return data, translateError(err) return data, translateError(err)

View File

@ -46,6 +46,7 @@ func init() {
metav1.AddToGroupVersion(paramScheme, metav1.SchemeGroupVersion) metav1.AddToGroupVersion(paramScheme, metav1.SchemeGroupVersion)
} }
// ClientGetter is a dynamic kubernetes client factory.
type ClientGetter interface { type ClientGetter interface {
IsImpersonating() bool IsImpersonating() bool
K8sInterface(ctx *types.APIRequest) (kubernetes.Interface, error) K8sInterface(ctx *types.APIRequest) (kubernetes.Interface, error)
@ -59,15 +60,18 @@ type ClientGetter interface {
TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
} }
// RelationshipNotifier is an interface for handling wrangler summary.Relationship events.
type RelationshipNotifier interface { type RelationshipNotifier interface {
OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship
} }
// Store implements types.Store directly on top of kubernetes.
type Store struct { type Store struct {
clientGetter ClientGetter clientGetter ClientGetter
notifier RelationshipNotifier notifier RelationshipNotifier
} }
// NewProxyStore returns a wrapped types.Store.
func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store { func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store {
return &errorStore{ return &errorStore{
Store: &WatchRefresh{ Store: &WatchRefresh{
@ -84,6 +88,7 @@ func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, loo
} }
} }
// ByID looks up a single object by its ID.
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
result, err := s.byID(apiOp, schema, apiOp.Namespace, id) result, err := s.byID(apiOp, schema, apiOp.Namespace, id)
return toAPI(schema, result), err return toAPI(schema, result), err
@ -219,6 +224,12 @@ func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured {
return result return result
} }
// ByNames filters a list of objects by an allowed set of names.
// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names,
// performing the list or watch will result in a Forbidden error, because the user does not have permission
// to list *all* resources.
// With this filter, the request can be performed successfully, and only the allowed resources will
// be returned in the list.
func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (types.APIObjectList, error) { func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (types.APIObjectList, error) {
if apiOp.Namespace == "*" { if apiOp.Namespace == "*" {
// This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat // This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat
@ -247,6 +258,7 @@ func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names
return objs, nil return objs, nil
} }
// List returns a list of resources.
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace) client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)
if err != nil { if err != nil {
@ -360,6 +372,12 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
return return
} }
// WatchNames returns a channel of events filtered by an allowed set of names.
// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names,
// performing the list or watch will result in a Forbidden error, because the user does not have permission
// to list *all* resources.
// With this filter, the request can be performed successfully, and only the allowed resources will
// be returned in watch.
func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) { func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) {
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace) adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace)
if err != nil { if err != nil {
@ -383,6 +401,7 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t
return result, nil return result, nil
} }
// Watch returns a channel of events for a list or resource.
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace) client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace)
if err != nil { if err != nil {
@ -428,6 +447,7 @@ func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et
return event return event
} }
// Create creates a single object in the store.
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) { func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) {
var ( var (
resp *unstructured.Unstructured resp *unstructured.Unstructured
@ -468,6 +488,7 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params
return apiObject, err return apiObject, err
} }
// Update updates a single object in the store.
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) { func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) {
var ( var (
err error err error
@ -535,6 +556,7 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
return toAPI(schema, resp), nil return toAPI(schema, resp), nil
} }
// Delete deletes an object from a store.
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
opts := metav1.DeleteOptions{} opts := metav1.DeleteOptions{}
if err := decodeParams(apiOp, &opts); err != nil { if err := decodeParams(apiOp, &opts); err != nil {

View File

@ -1,9 +1,7 @@
package proxy package proxy
import ( import (
"context"
"fmt" "fmt"
"net/http"
"sort" "sort"
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
@ -20,19 +18,7 @@ var (
} }
) )
type filterKey struct{} // Partition is an implementation of the partition.Partition interface that uses RBAC to determine how a set of resources should be segregated and accessed.
func AddNamespaceConstraint(req *http.Request, names ...string) *http.Request {
set := sets.NewString(names...)
ctx := context.WithValue(req.Context(), filterKey{}, set)
return req.WithContext(ctx)
}
func getNamespaceConstraint(req *http.Request) (sets.String, bool) {
set, ok := req.Context().Value(filterKey{}).(sets.String)
return set, ok
}
type Partition struct { type Partition struct {
Namespace string Namespace string
All bool All bool
@ -40,14 +26,18 @@ type Partition struct {
Names sets.String Names sets.String
} }
// Name returns the name of the partition, which for this type is the namespace.
func (p Partition) Name() string { func (p Partition) Name() string {
return p.Namespace return p.Namespace
} }
// rbacPartitioner is an implementation of the partition.Partioner interface.
type rbacPartitioner struct { type rbacPartitioner struct {
proxyStore *Store proxyStore *Store
} }
// Lookup returns the default passthrough partition which is used only for retrieving single resources.
// Listing or watching resources require custom partitions.
func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (partition.Partition, error) { func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (partition.Partition, error) {
switch verb { switch verb {
case "create": case "create":
@ -63,6 +53,9 @@ func (p *rbacPartitioner) Lookup(apiOp *types.APIRequest, schema *types.APISchem
} }
} }
// All returns a slice of partitions applicable to the API schema and the user's access level.
// For watching individual resources or for blanket access permissions, it returns the passthrough partition.
// For more granular permissions, it returns a slice of partitions matching an allowed namespace or resource names.
func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]partition.Partition, error) { func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]partition.Partition, error) {
switch verb { switch verb {
case "list": case "list":
@ -92,6 +85,7 @@ func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema,
} }
} }
// Store returns a proxy Store suited to listing and watching resources by partition.
func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) { func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) {
return &byNameOrNamespaceStore{ return &byNameOrNamespaceStore{
Store: p.proxyStore, Store: p.proxyStore,
@ -104,6 +98,7 @@ type byNameOrNamespaceStore struct {
partition Partition partition Partition
} }
// List returns a list of resources by partition.
func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
if b.partition.Passthrough { if b.partition.Passthrough {
return b.Store.List(apiOp, schema) return b.Store.List(apiOp, schema)
@ -116,6 +111,7 @@ func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.API
return b.Store.ByNames(apiOp, schema, b.partition.Names) return b.Store.ByNames(apiOp, schema, b.partition.Names)
} }
// Watch returns a channel of resources by partition.
func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
if b.partition.Passthrough { if b.partition.Passthrough {
return b.Store.Watch(apiOp, schema, wr) return b.Store.Watch(apiOp, schema, wr)
@ -128,35 +124,9 @@ func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.AP
return b.Store.WatchNames(apiOp, schema, wr, b.partition.Names) return b.Store.WatchNames(apiOp, schema, wr, b.partition.Names)
} }
// isPassthrough determines whether a request can be passed through directly to the underlying store
// or if the results need to be partitioned by namespace and name based on the requester's access.
func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) { func isPassthrough(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) {
partitions, passthrough := isPassthroughUnconstrained(apiOp, schema, verb)
namespaces, ok := getNamespaceConstraint(apiOp.Request)
if !ok {
return partitions, passthrough
}
var result []partition.Partition
if passthrough {
for namespace := range namespaces {
result = append(result, Partition{
Namespace: namespace,
All: true,
})
}
return result, false
}
for _, partition := range partitions {
if namespaces.Has(partition.Name()) {
result = append(result, partition)
}
}
return result, false
}
func isPassthroughUnconstrained(apiOp *types.APIRequest, schema *types.APISchema, verb string) ([]partition.Partition, bool) {
accessListByVerb, _ := attributes.Access(schema).(accesscontrol.AccessListByVerb) accessListByVerb, _ := attributes.Access(schema).(accesscontrol.AccessListByVerb)
if accessListByVerb.All(verb) { if accessListByVerb.All(verb) {
return nil, true return nil, true

View File

@ -9,11 +9,13 @@ import (
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
) )
// WatchRefresh implements types.Store with awareness of changes to the requester's access.
type WatchRefresh struct { type WatchRefresh struct {
types.Store types.Store
asl accesscontrol.AccessSetLookup asl accesscontrol.AccessSetLookup
} }
// Watch performs a watch request which halts if the user's access level changes.
func (w *WatchRefresh) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { func (w *WatchRefresh) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
user, ok := request.UserFrom(apiOp.Context()) user, ok := request.UserFrom(apiOp.Context())
if !ok { if !ok {

View File

@ -1,59 +0,0 @@
package switchstore
import (
"github.com/rancher/apiserver/pkg/types"
)
type StorePicker func(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error)
type Store struct {
Picker StorePicker
}
func (e *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
s, err := e.Picker(apiOp, schema, "delete", id)
if err != nil {
return types.APIObject{}, err
}
return s.Delete(apiOp, schema, id)
}
func (e *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
s, err := e.Picker(apiOp, schema, "get", id)
if err != nil {
return types.APIObject{}, err
}
return s.ByID(apiOp, schema, id)
}
func (e *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
s, err := e.Picker(apiOp, schema, "list", "")
if err != nil {
return types.APIObjectList{}, err
}
return s.List(apiOp, schema)
}
func (e *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
s, err := e.Picker(apiOp, schema, "create", "")
if err != nil {
return types.APIObject{}, err
}
return s.Create(apiOp, schema, data)
}
func (e *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
s, err := e.Picker(apiOp, schema, "update", id)
if err != nil {
return types.APIObject{}, err
}
return s.Update(apiOp, schema, data, id)
}
func (e *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
s, err := e.Picker(apiOp, schema, "watch", "")
if err != nil {
return nil, err
}
return s.Watch(apiOp, schema, wr)
}