diff --git a/pkg/stores/partition/listprocessor/processor.go b/pkg/stores/partition/listprocessor/processor.go index 172312c..614a3ee 100644 --- a/pkg/stores/partition/listprocessor/processor.go +++ b/pkg/stores/partition/listprocessor/processor.go @@ -20,6 +20,7 @@ const ( sortParam = "sort" pageSizeParam = "pagesize" pageParam = "page" + revisionParam = "revision" ) // ListOptions represents the query parameters that may be included in a list request. @@ -29,6 +30,7 @@ type ListOptions struct { Filters []Filter Sort Sort Pagination Pagination + Revision string } // Filter represents a field to filter by. @@ -39,6 +41,12 @@ type Filter struct { match string } +// String returns the filter as a query string. +func (f Filter) String() string { + field := strings.Join(f.field, ".") + return field + "=" + f.match +} + // SortOrder represents whether the list should be ascending or descending. type SortOrder int @@ -58,12 +66,26 @@ type Sort struct { order SortOrder } +// String returns the sort parameters as a query string. +func (s Sort) String() string { + field := strings.Join(s.field, ".") + if s.order == DESC { + field = "-" + field + } + return field +} + // Pagination represents how to return paginated results. type Pagination struct { pageSize int page int } +// PageSize returns the integer page size. +func (p Pagination) PageSize() int { + return p.pageSize +} + // ParseQuery parses the query params of a request and returns a ListOptions. func ParseQuery(apiOp *types.APIRequest) *ListOptions { chunkSize := getLimit(apiOp) @@ -78,14 +100,20 @@ func ParseQuery(apiOp *types.APIRequest) *ListOptions { } filterOpts = append(filterOpts, Filter{field: strings.Split(filter[0], "."), match: filter[1]}) } - sort := Sort{} + // sort the filter fields so they can be used as a cache key in the store + sort.Slice(filterOpts, func(i, j int) bool { + fieldI := strings.Join(filterOpts[i].field, ".") + fieldJ := strings.Join(filterOpts[j].field, ".") + return fieldI < fieldJ + }) + sortOpts := Sort{} sortKey := q.Get(sortParam) if sortKey != "" && sortKey[0] == '-' { - sort.order = DESC + sortOpts.order = DESC sortKey = sortKey[1:] } if sortKey != "" { - sort.field = strings.Split(sortKey, ".") + sortOpts.field = strings.Split(sortKey, ".") } var err error pagination := Pagination{} @@ -97,12 +125,14 @@ func ParseQuery(apiOp *types.APIRequest) *ListOptions { if err != nil { pagination.page = 1 } + revision := q.Get(revisionParam) return &ListOptions{ ChunkSize: chunkSize, Resume: cont, Filters: filterOpts, - Sort: sort, + Sort: sortOpts, Pagination: pagination, + Revision: revision, } } diff --git a/pkg/stores/partition/store.go b/pkg/stores/partition/store.go index 6d256f8..ca9c9c7 100644 --- a/pkg/stores/partition/store.go +++ b/pkg/stores/partition/store.go @@ -5,17 +5,32 @@ package partition import ( "context" "fmt" + "os" "reflect" "strconv" + "time" "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/stores/partition/listprocessor" + "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/request" +) + +const ( + // Number of list request entries to save before cache replacement. + // Not related to the total size in memory of the cache, as any item could take any amount of memory. + cacheSizeEnv = "CATTLE_REQUEST_CACHE_SIZE_INT" + defaultCacheSize = 1000 + // Set to non-empty to disable list request caching entirely. + cacheDisableEnv = "CATTLE_REQUEST_CACHE_DISABLED" ) // Partitioner is an interface for interacting with partitions. @@ -28,6 +43,38 @@ type Partitioner interface { // Store implements types.Store for partitions. type Store struct { Partitioner Partitioner + listCache *cache.LRUExpireCache + asl accesscontrol.AccessSetLookup +} + +// NewStore creates a types.Store implementation with a partitioner and an LRU expiring cache for list responses. +func NewStore(partitioner Partitioner, asl accesscontrol.AccessSetLookup) *Store { + cacheSize := defaultCacheSize + if v := os.Getenv(cacheSizeEnv); v != "" { + sizeInt, err := strconv.Atoi(v) + if err == nil { + cacheSize = sizeInt + } + } + s := &Store{ + Partitioner: partitioner, + asl: asl, + } + if v := os.Getenv(cacheDisableEnv); v == "" { + s.listCache = cache.NewLRUExpireCache(cacheSize) + } + return s +} + +type cacheKey struct { + chunkSize int + resume string + filters string + sort string + pageSize int + accessID string + resourcePath string + revision string } // UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types. @@ -122,13 +169,40 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP opts := listprocessor.ParseQuery(apiOp) - stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume) + key, err := s.getCacheKey(apiOp, opts) if err != nil { return result, err } - list := listprocessor.FilterList(stream, opts.Filters) - list = listprocessor.SortList(list, opts.Sort) + var list []unstructured.Unstructured + if key.revision != "" && s.listCache != nil { + cachedList, ok := s.listCache.Get(key) + if ok { + logrus.Tracef("found cached list for query %s?%s", apiOp.Request.URL.Path, apiOp.Request.URL.RawQuery) + list = cachedList.(*unstructured.UnstructuredList).Items + result.Continue = cachedList.(*unstructured.UnstructuredList).GetContinue() + } + } + if list == nil { // did not look in cache or was not found in cache + stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume) + if err != nil { + return result, err + } + list = listprocessor.FilterList(stream, opts.Filters) + list = listprocessor.SortList(list, opts.Sort) + key.revision = lister.Revision() + listToCache := &unstructured.UnstructuredList{ + Items: list, + } + c := lister.Continue() + if c != "" { + listToCache.SetContinue(c) + } + if s.listCache != nil { + s.listCache.Add(key, listToCache, 30*time.Minute) + } + result.Continue = lister.Continue() + } result.Count = len(list) list, pages := listprocessor.PaginateList(list, opts.Pagination) @@ -137,13 +211,33 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP result.Objects = append(result.Objects, toAPI(schema, &item)) } - result.Revision = lister.Revision() - result.Continue = lister.Continue() + result.Revision = key.revision result.Pages = pages - return result, lister.Err() } +// getCacheKey returns a hashable struct identifying a unique user and request. +func (s *Store) getCacheKey(apiOp *types.APIRequest, opts *listprocessor.ListOptions) (cacheKey, error) { + user, ok := request.UserFrom(apiOp.Request.Context()) + if !ok { + return cacheKey{}, fmt.Errorf("could not find user in request") + } + filters := "" + for _, f := range opts.Filters { + filters = filters + f.String() + } + return cacheKey{ + chunkSize: opts.ChunkSize, + resume: opts.Resume, + filters: filters, + sort: opts.Sort.String(), + pageSize: opts.Pagination.PageSize(), + accessID: s.asl.AccessFor(user).ID, + resourcePath: apiOp.Request.URL.Path, + revision: opts.Revision, + }, nil +} + // Create creates a single object in the store. func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "create", "") diff --git a/pkg/stores/partition/store_test.go b/pkg/stores/partition/store_test.go index 4b50c25..21325d7 100644 --- a/pkg/stores/partition/store_test.go +++ b/pkg/stores/partition/store_test.go @@ -1,6 +1,7 @@ package partition import ( + "context" "encoding/base64" "fmt" "net/http" @@ -9,10 +10,13 @@ import ( "testing" "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/wrangler/pkg/schemas" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" ) func TestList(t *testing.T) { @@ -26,7 +30,7 @@ func TestList(t *testing.T) { { name: "basic", apiOps: []*types.APIRequest{ - newRequest(""), + newRequest("", "user1"), }, partitions: []Partition{ mockPartition{ @@ -52,9 +56,9 @@ func TestList(t *testing.T) { { name: "limit and continue", apiOps: []*types.APIRequest{ - newRequest("limit=1"), - newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("granny-smith"))))))), - newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("crispin"))))))), + newRequest("limit=1", "user1"), + newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("granny-smith")))))), "user1"), + newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("crispin")))))), "user1"), }, partitions: []Partition{ mockPartition{ @@ -96,7 +100,7 @@ func TestList(t *testing.T) { { name: "multi-partition", apiOps: []*types.APIRequest{ - newRequest(""), + newRequest("", "user1"), }, partitions: []Partition{ mockPartition{ @@ -136,9 +140,9 @@ func TestList(t *testing.T) { { name: "multi-partition with limit and continue", apiOps: []*types.APIRequest{ - newRequest("limit=3"), - newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"green","o":1,"l":3}`)))), - newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"red","l":3}`)))), + newRequest("limit=3", "user1"), + newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"green","o":1,"l":3}`))), "user1"), + newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"red","l":3}`))), "user1"), }, partitions: []Partition{ mockPartition{ @@ -209,8 +213,8 @@ func TestList(t *testing.T) { { name: "with filters", apiOps: []*types.APIRequest{ - newRequest("filter=data.color=green"), - newRequest("filter=data.color=green&filter=metadata.name=bramley"), + newRequest("filter=data.color=green", "user1"), + newRequest("filter=data.color=green&filter=metadata.name=bramley", "user1"), }, partitions: []Partition{ mockPartition{ @@ -246,7 +250,7 @@ func TestList(t *testing.T) { { name: "multi-partition with filters", apiOps: []*types.APIRequest{ - newRequest("filter=data.category=baking"), + newRequest("filter=data.category=baking", "user1"), }, partitions: []Partition{ mockPartition{ @@ -292,8 +296,8 @@ func TestList(t *testing.T) { { name: "with sorting", apiOps: []*types.APIRequest{ - newRequest("sort=metadata.name"), - newRequest("sort=-metadata.name"), + newRequest("sort=metadata.name", "user1"), + newRequest("sort=-metadata.name", "user1"), }, partitions: []Partition{ mockPartition{ @@ -334,7 +338,7 @@ func TestList(t *testing.T) { { name: "multi-partition sort=metadata.name", apiOps: []*types.APIRequest{ - newRequest("sort=metadata.name"), + newRequest("sort=metadata.name", "user1"), }, partitions: []Partition{ mockPartition{ @@ -381,12 +385,11 @@ func TestList(t *testing.T) { contents: test.objects[p.Name()], } } - store := Store{ - Partitioner: mockPartitioner{ - stores: stores, - partitions: test.partitions, - }, - } + asl := &mockAccessSetLookup{} + store := NewStore(mockPartitioner{ + stores: stores, + partitions: test.partitions, + }, asl) for i, req := range test.apiOps { got, gotErr := store.List(req, schema) assert.Nil(t, gotErr) @@ -488,16 +491,19 @@ var colorMap = map[string]string{ "red-delicious": "red", } -func newRequest(query string) *types.APIRequest { +func newRequest(query, username string) *types.APIRequest { return &types.APIRequest{ - Request: &http.Request{ + Request: (&http.Request{ URL: &url.URL{ Scheme: "https", Host: "rancher", Path: "/apples", RawQuery: query, }, - }, + }).WithContext(request.WithUser(context.Background(), &user.DefaultInfo{ + Name: username, + Groups: []string{"system:authenticated"}, + })), } } @@ -533,3 +539,15 @@ func (a apple) with(data map[string]string) apple { } return a } + +type mockAccessSetLookup struct{} + +func (m *mockAccessSetLookup) AccessFor(_ user.Info) *accesscontrol.AccessSet { + return &accesscontrol.AccessSet{ + ID: "aabbccdd", + } +} + +func (m *mockAccessSetLookup) PurgeUserData(_ string) { + panic("not implemented") +} diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index c3c40c8..21aeba6 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -75,14 +75,15 @@ type Store struct { func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store { return &errorStore{ Store: &WatchRefresh{ - Store: &partition.Store{ - Partitioner: &rbacPartitioner{ + Store: partition.NewStore( + &rbacPartitioner{ proxyStore: &Store{ clientGetter: clientGetter, notifier: notifier, }, }, - }, + lookup, + ), asl: lookup, }, }