From b0eb50f38d741a86841d2115adb7e53ec5f2c954 Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Thu, 3 Nov 2022 15:26:49 -0700 Subject: [PATCH] Add caching to pagination Cache filtered, sorted results for fast subsequent page retrieval. Requests for cached queries need to include the list revision number along with other queries. If no specific revision is requested, a new fetch is done in order to get the latest revision. The revision is included in the list response. Example first request: GET /v1/secrets?pagesize=10 Example subsequent page request: GET /v1/secrets?pagesize=10&page=1&revision=107740 --- .../partition/listprocessor/processor.go | 38 ++++++- pkg/stores/partition/store.go | 106 +++++++++++++++++- pkg/stores/partition/store_test.go | 64 +++++++---- pkg/stores/proxy/proxy_store.go | 7 +- 4 files changed, 179 insertions(+), 36 deletions(-) diff --git a/pkg/stores/partition/listprocessor/processor.go b/pkg/stores/partition/listprocessor/processor.go index 172312c..614a3ee 100644 --- a/pkg/stores/partition/listprocessor/processor.go +++ b/pkg/stores/partition/listprocessor/processor.go @@ -20,6 +20,7 @@ const ( sortParam = "sort" pageSizeParam = "pagesize" pageParam = "page" + revisionParam = "revision" ) // ListOptions represents the query parameters that may be included in a list request. @@ -29,6 +30,7 @@ type ListOptions struct { Filters []Filter Sort Sort Pagination Pagination + Revision string } // Filter represents a field to filter by. @@ -39,6 +41,12 @@ type Filter struct { match string } +// String returns the filter as a query string. +func (f Filter) String() string { + field := strings.Join(f.field, ".") + return field + "=" + f.match +} + // SortOrder represents whether the list should be ascending or descending. type SortOrder int @@ -58,12 +66,26 @@ type Sort struct { order SortOrder } +// String returns the sort parameters as a query string. +func (s Sort) String() string { + field := strings.Join(s.field, ".") + if s.order == DESC { + field = "-" + field + } + return field +} + // Pagination represents how to return paginated results. type Pagination struct { pageSize int page int } +// PageSize returns the integer page size. +func (p Pagination) PageSize() int { + return p.pageSize +} + // ParseQuery parses the query params of a request and returns a ListOptions. func ParseQuery(apiOp *types.APIRequest) *ListOptions { chunkSize := getLimit(apiOp) @@ -78,14 +100,20 @@ func ParseQuery(apiOp *types.APIRequest) *ListOptions { } filterOpts = append(filterOpts, Filter{field: strings.Split(filter[0], "."), match: filter[1]}) } - sort := Sort{} + // sort the filter fields so they can be used as a cache key in the store + sort.Slice(filterOpts, func(i, j int) bool { + fieldI := strings.Join(filterOpts[i].field, ".") + fieldJ := strings.Join(filterOpts[j].field, ".") + return fieldI < fieldJ + }) + sortOpts := Sort{} sortKey := q.Get(sortParam) if sortKey != "" && sortKey[0] == '-' { - sort.order = DESC + sortOpts.order = DESC sortKey = sortKey[1:] } if sortKey != "" { - sort.field = strings.Split(sortKey, ".") + sortOpts.field = strings.Split(sortKey, ".") } var err error pagination := Pagination{} @@ -97,12 +125,14 @@ func ParseQuery(apiOp *types.APIRequest) *ListOptions { if err != nil { pagination.page = 1 } + revision := q.Get(revisionParam) return &ListOptions{ ChunkSize: chunkSize, Resume: cont, Filters: filterOpts, - Sort: sort, + Sort: sortOpts, Pagination: pagination, + Revision: revision, } } diff --git a/pkg/stores/partition/store.go b/pkg/stores/partition/store.go index 6d256f8..ca9c9c7 100644 --- a/pkg/stores/partition/store.go +++ b/pkg/stores/partition/store.go @@ -5,17 +5,32 @@ package partition import ( "context" "fmt" + "os" "reflect" "strconv" + "time" "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/steve/pkg/stores/partition/listprocessor" + "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/request" +) + +const ( + // Number of list request entries to save before cache replacement. + // Not related to the total size in memory of the cache, as any item could take any amount of memory. + cacheSizeEnv = "CATTLE_REQUEST_CACHE_SIZE_INT" + defaultCacheSize = 1000 + // Set to non-empty to disable list request caching entirely. + cacheDisableEnv = "CATTLE_REQUEST_CACHE_DISABLED" ) // Partitioner is an interface for interacting with partitions. @@ -28,6 +43,38 @@ type Partitioner interface { // Store implements types.Store for partitions. type Store struct { Partitioner Partitioner + listCache *cache.LRUExpireCache + asl accesscontrol.AccessSetLookup +} + +// NewStore creates a types.Store implementation with a partitioner and an LRU expiring cache for list responses. +func NewStore(partitioner Partitioner, asl accesscontrol.AccessSetLookup) *Store { + cacheSize := defaultCacheSize + if v := os.Getenv(cacheSizeEnv); v != "" { + sizeInt, err := strconv.Atoi(v) + if err == nil { + cacheSize = sizeInt + } + } + s := &Store{ + Partitioner: partitioner, + asl: asl, + } + if v := os.Getenv(cacheDisableEnv); v == "" { + s.listCache = cache.NewLRUExpireCache(cacheSize) + } + return s +} + +type cacheKey struct { + chunkSize int + resume string + filters string + sort string + pageSize int + accessID string + resourcePath string + revision string } // UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types. @@ -122,13 +169,40 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP opts := listprocessor.ParseQuery(apiOp) - stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume) + key, err := s.getCacheKey(apiOp, opts) if err != nil { return result, err } - list := listprocessor.FilterList(stream, opts.Filters) - list = listprocessor.SortList(list, opts.Sort) + var list []unstructured.Unstructured + if key.revision != "" && s.listCache != nil { + cachedList, ok := s.listCache.Get(key) + if ok { + logrus.Tracef("found cached list for query %s?%s", apiOp.Request.URL.Path, apiOp.Request.URL.RawQuery) + list = cachedList.(*unstructured.UnstructuredList).Items + result.Continue = cachedList.(*unstructured.UnstructuredList).GetContinue() + } + } + if list == nil { // did not look in cache or was not found in cache + stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume) + if err != nil { + return result, err + } + list = listprocessor.FilterList(stream, opts.Filters) + list = listprocessor.SortList(list, opts.Sort) + key.revision = lister.Revision() + listToCache := &unstructured.UnstructuredList{ + Items: list, + } + c := lister.Continue() + if c != "" { + listToCache.SetContinue(c) + } + if s.listCache != nil { + s.listCache.Add(key, listToCache, 30*time.Minute) + } + result.Continue = lister.Continue() + } result.Count = len(list) list, pages := listprocessor.PaginateList(list, opts.Pagination) @@ -137,13 +211,33 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP result.Objects = append(result.Objects, toAPI(schema, &item)) } - result.Revision = lister.Revision() - result.Continue = lister.Continue() + result.Revision = key.revision result.Pages = pages - return result, lister.Err() } +// getCacheKey returns a hashable struct identifying a unique user and request. +func (s *Store) getCacheKey(apiOp *types.APIRequest, opts *listprocessor.ListOptions) (cacheKey, error) { + user, ok := request.UserFrom(apiOp.Request.Context()) + if !ok { + return cacheKey{}, fmt.Errorf("could not find user in request") + } + filters := "" + for _, f := range opts.Filters { + filters = filters + f.String() + } + return cacheKey{ + chunkSize: opts.ChunkSize, + resume: opts.Resume, + filters: filters, + sort: opts.Sort.String(), + pageSize: opts.Pagination.PageSize(), + accessID: s.asl.AccessFor(user).ID, + resourcePath: apiOp.Request.URL.Path, + revision: opts.Revision, + }, nil +} + // Create creates a single object in the store. func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { target, err := s.getStore(apiOp, schema, "create", "") diff --git a/pkg/stores/partition/store_test.go b/pkg/stores/partition/store_test.go index 4b50c25..21325d7 100644 --- a/pkg/stores/partition/store_test.go +++ b/pkg/stores/partition/store_test.go @@ -1,6 +1,7 @@ package partition import ( + "context" "encoding/base64" "fmt" "net/http" @@ -9,10 +10,13 @@ import ( "testing" "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/accesscontrol" "github.com/rancher/wrangler/pkg/schemas" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/request" ) func TestList(t *testing.T) { @@ -26,7 +30,7 @@ func TestList(t *testing.T) { { name: "basic", apiOps: []*types.APIRequest{ - newRequest(""), + newRequest("", "user1"), }, partitions: []Partition{ mockPartition{ @@ -52,9 +56,9 @@ func TestList(t *testing.T) { { name: "limit and continue", apiOps: []*types.APIRequest{ - newRequest("limit=1"), - newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("granny-smith"))))))), - newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("crispin"))))))), + newRequest("limit=1", "user1"), + newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("granny-smith")))))), "user1"), + newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("crispin")))))), "user1"), }, partitions: []Partition{ mockPartition{ @@ -96,7 +100,7 @@ func TestList(t *testing.T) { { name: "multi-partition", apiOps: []*types.APIRequest{ - newRequest(""), + newRequest("", "user1"), }, partitions: []Partition{ mockPartition{ @@ -136,9 +140,9 @@ func TestList(t *testing.T) { { name: "multi-partition with limit and continue", apiOps: []*types.APIRequest{ - newRequest("limit=3"), - newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"green","o":1,"l":3}`)))), - newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"red","l":3}`)))), + newRequest("limit=3", "user1"), + newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"green","o":1,"l":3}`))), "user1"), + newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"red","l":3}`))), "user1"), }, partitions: []Partition{ mockPartition{ @@ -209,8 +213,8 @@ func TestList(t *testing.T) { { name: "with filters", apiOps: []*types.APIRequest{ - newRequest("filter=data.color=green"), - newRequest("filter=data.color=green&filter=metadata.name=bramley"), + newRequest("filter=data.color=green", "user1"), + newRequest("filter=data.color=green&filter=metadata.name=bramley", "user1"), }, partitions: []Partition{ mockPartition{ @@ -246,7 +250,7 @@ func TestList(t *testing.T) { { name: "multi-partition with filters", apiOps: []*types.APIRequest{ - newRequest("filter=data.category=baking"), + newRequest("filter=data.category=baking", "user1"), }, partitions: []Partition{ mockPartition{ @@ -292,8 +296,8 @@ func TestList(t *testing.T) { { name: "with sorting", apiOps: []*types.APIRequest{ - newRequest("sort=metadata.name"), - newRequest("sort=-metadata.name"), + newRequest("sort=metadata.name", "user1"), + newRequest("sort=-metadata.name", "user1"), }, partitions: []Partition{ mockPartition{ @@ -334,7 +338,7 @@ func TestList(t *testing.T) { { name: "multi-partition sort=metadata.name", apiOps: []*types.APIRequest{ - newRequest("sort=metadata.name"), + newRequest("sort=metadata.name", "user1"), }, partitions: []Partition{ mockPartition{ @@ -381,12 +385,11 @@ func TestList(t *testing.T) { contents: test.objects[p.Name()], } } - store := Store{ - Partitioner: mockPartitioner{ - stores: stores, - partitions: test.partitions, - }, - } + asl := &mockAccessSetLookup{} + store := NewStore(mockPartitioner{ + stores: stores, + partitions: test.partitions, + }, asl) for i, req := range test.apiOps { got, gotErr := store.List(req, schema) assert.Nil(t, gotErr) @@ -488,16 +491,19 @@ var colorMap = map[string]string{ "red-delicious": "red", } -func newRequest(query string) *types.APIRequest { +func newRequest(query, username string) *types.APIRequest { return &types.APIRequest{ - Request: &http.Request{ + Request: (&http.Request{ URL: &url.URL{ Scheme: "https", Host: "rancher", Path: "/apples", RawQuery: query, }, - }, + }).WithContext(request.WithUser(context.Background(), &user.DefaultInfo{ + Name: username, + Groups: []string{"system:authenticated"}, + })), } } @@ -533,3 +539,15 @@ func (a apple) with(data map[string]string) apple { } return a } + +type mockAccessSetLookup struct{} + +func (m *mockAccessSetLookup) AccessFor(_ user.Info) *accesscontrol.AccessSet { + return &accesscontrol.AccessSet{ + ID: "aabbccdd", + } +} + +func (m *mockAccessSetLookup) PurgeUserData(_ string) { + panic("not implemented") +} diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index c3c40c8..21aeba6 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -75,14 +75,15 @@ type Store struct { func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store { return &errorStore{ Store: &WatchRefresh{ - Store: &partition.Store{ - Partitioner: &rbacPartitioner{ + Store: partition.NewStore( + &rbacPartitioner{ proxyStore: &Store{ clientGetter: clientGetter, notifier: notifier, }, }, - }, + lookup, + ), asl: lookup, }, }