mirror of
https://github.com/niusmallnan/steve.git
synced 2025-07-31 05:50:08 +00:00
Add caching to pagination
Cache filtered, sorted results for fast subsequent page retrieval. Requests for cached queries need to include the list revision number along with other queries. If no specific revision is requested, a new fetch is done in order to get the latest revision. The revision is included in the list response. Example first request: GET /v1/secrets?pagesize=10 Example subsequent page request: GET /v1/secrets?pagesize=10&page=1&revision=107740
This commit is contained in:
parent
9f1a27db06
commit
b0eb50f38d
@ -20,6 +20,7 @@ const (
|
||||
sortParam = "sort"
|
||||
pageSizeParam = "pagesize"
|
||||
pageParam = "page"
|
||||
revisionParam = "revision"
|
||||
)
|
||||
|
||||
// ListOptions represents the query parameters that may be included in a list request.
|
||||
@ -29,6 +30,7 @@ type ListOptions struct {
|
||||
Filters []Filter
|
||||
Sort Sort
|
||||
Pagination Pagination
|
||||
Revision string
|
||||
}
|
||||
|
||||
// Filter represents a field to filter by.
|
||||
@ -39,6 +41,12 @@ type Filter struct {
|
||||
match string
|
||||
}
|
||||
|
||||
// String returns the filter as a query string.
|
||||
func (f Filter) String() string {
|
||||
field := strings.Join(f.field, ".")
|
||||
return field + "=" + f.match
|
||||
}
|
||||
|
||||
// SortOrder represents whether the list should be ascending or descending.
|
||||
type SortOrder int
|
||||
|
||||
@ -58,12 +66,26 @@ type Sort struct {
|
||||
order SortOrder
|
||||
}
|
||||
|
||||
// String returns the sort parameters as a query string.
|
||||
func (s Sort) String() string {
|
||||
field := strings.Join(s.field, ".")
|
||||
if s.order == DESC {
|
||||
field = "-" + field
|
||||
}
|
||||
return field
|
||||
}
|
||||
|
||||
// Pagination represents how to return paginated results.
|
||||
type Pagination struct {
|
||||
pageSize int
|
||||
page int
|
||||
}
|
||||
|
||||
// PageSize returns the integer page size.
|
||||
func (p Pagination) PageSize() int {
|
||||
return p.pageSize
|
||||
}
|
||||
|
||||
// ParseQuery parses the query params of a request and returns a ListOptions.
|
||||
func ParseQuery(apiOp *types.APIRequest) *ListOptions {
|
||||
chunkSize := getLimit(apiOp)
|
||||
@ -78,14 +100,20 @@ func ParseQuery(apiOp *types.APIRequest) *ListOptions {
|
||||
}
|
||||
filterOpts = append(filterOpts, Filter{field: strings.Split(filter[0], "."), match: filter[1]})
|
||||
}
|
||||
sort := Sort{}
|
||||
// sort the filter fields so they can be used as a cache key in the store
|
||||
sort.Slice(filterOpts, func(i, j int) bool {
|
||||
fieldI := strings.Join(filterOpts[i].field, ".")
|
||||
fieldJ := strings.Join(filterOpts[j].field, ".")
|
||||
return fieldI < fieldJ
|
||||
})
|
||||
sortOpts := Sort{}
|
||||
sortKey := q.Get(sortParam)
|
||||
if sortKey != "" && sortKey[0] == '-' {
|
||||
sort.order = DESC
|
||||
sortOpts.order = DESC
|
||||
sortKey = sortKey[1:]
|
||||
}
|
||||
if sortKey != "" {
|
||||
sort.field = strings.Split(sortKey, ".")
|
||||
sortOpts.field = strings.Split(sortKey, ".")
|
||||
}
|
||||
var err error
|
||||
pagination := Pagination{}
|
||||
@ -97,12 +125,14 @@ func ParseQuery(apiOp *types.APIRequest) *ListOptions {
|
||||
if err != nil {
|
||||
pagination.page = 1
|
||||
}
|
||||
revision := q.Get(revisionParam)
|
||||
return &ListOptions{
|
||||
ChunkSize: chunkSize,
|
||||
Resume: cont,
|
||||
Filters: filterOpts,
|
||||
Sort: sort,
|
||||
Sort: sortOpts,
|
||||
Pagination: pagination,
|
||||
Revision: revision,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,17 +5,32 @@ package partition
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/rancher/apiserver/pkg/types"
|
||||
"github.com/rancher/steve/pkg/accesscontrol"
|
||||
"github.com/rancher/steve/pkg/stores/partition/listprocessor"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/cache"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
const (
|
||||
// Number of list request entries to save before cache replacement.
|
||||
// Not related to the total size in memory of the cache, as any item could take any amount of memory.
|
||||
cacheSizeEnv = "CATTLE_REQUEST_CACHE_SIZE_INT"
|
||||
defaultCacheSize = 1000
|
||||
// Set to non-empty to disable list request caching entirely.
|
||||
cacheDisableEnv = "CATTLE_REQUEST_CACHE_DISABLED"
|
||||
)
|
||||
|
||||
// Partitioner is an interface for interacting with partitions.
|
||||
@ -28,6 +43,38 @@ type Partitioner interface {
|
||||
// Store implements types.Store for partitions.
|
||||
type Store struct {
|
||||
Partitioner Partitioner
|
||||
listCache *cache.LRUExpireCache
|
||||
asl accesscontrol.AccessSetLookup
|
||||
}
|
||||
|
||||
// NewStore creates a types.Store implementation with a partitioner and an LRU expiring cache for list responses.
|
||||
func NewStore(partitioner Partitioner, asl accesscontrol.AccessSetLookup) *Store {
|
||||
cacheSize := defaultCacheSize
|
||||
if v := os.Getenv(cacheSizeEnv); v != "" {
|
||||
sizeInt, err := strconv.Atoi(v)
|
||||
if err == nil {
|
||||
cacheSize = sizeInt
|
||||
}
|
||||
}
|
||||
s := &Store{
|
||||
Partitioner: partitioner,
|
||||
asl: asl,
|
||||
}
|
||||
if v := os.Getenv(cacheDisableEnv); v == "" {
|
||||
s.listCache = cache.NewLRUExpireCache(cacheSize)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
type cacheKey struct {
|
||||
chunkSize int
|
||||
resume string
|
||||
filters string
|
||||
sort string
|
||||
pageSize int
|
||||
accessID string
|
||||
resourcePath string
|
||||
revision string
|
||||
}
|
||||
|
||||
// UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types.
|
||||
@ -122,13 +169,40 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
|
||||
|
||||
opts := listprocessor.ParseQuery(apiOp)
|
||||
|
||||
stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume)
|
||||
key, err := s.getCacheKey(apiOp, opts)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
list := listprocessor.FilterList(stream, opts.Filters)
|
||||
list = listprocessor.SortList(list, opts.Sort)
|
||||
var list []unstructured.Unstructured
|
||||
if key.revision != "" && s.listCache != nil {
|
||||
cachedList, ok := s.listCache.Get(key)
|
||||
if ok {
|
||||
logrus.Tracef("found cached list for query %s?%s", apiOp.Request.URL.Path, apiOp.Request.URL.RawQuery)
|
||||
list = cachedList.(*unstructured.UnstructuredList).Items
|
||||
result.Continue = cachedList.(*unstructured.UnstructuredList).GetContinue()
|
||||
}
|
||||
}
|
||||
if list == nil { // did not look in cache or was not found in cache
|
||||
stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
list = listprocessor.FilterList(stream, opts.Filters)
|
||||
list = listprocessor.SortList(list, opts.Sort)
|
||||
key.revision = lister.Revision()
|
||||
listToCache := &unstructured.UnstructuredList{
|
||||
Items: list,
|
||||
}
|
||||
c := lister.Continue()
|
||||
if c != "" {
|
||||
listToCache.SetContinue(c)
|
||||
}
|
||||
if s.listCache != nil {
|
||||
s.listCache.Add(key, listToCache, 30*time.Minute)
|
||||
}
|
||||
result.Continue = lister.Continue()
|
||||
}
|
||||
result.Count = len(list)
|
||||
list, pages := listprocessor.PaginateList(list, opts.Pagination)
|
||||
|
||||
@ -137,13 +211,33 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
|
||||
result.Objects = append(result.Objects, toAPI(schema, &item))
|
||||
}
|
||||
|
||||
result.Revision = lister.Revision()
|
||||
result.Continue = lister.Continue()
|
||||
result.Revision = key.revision
|
||||
result.Pages = pages
|
||||
|
||||
return result, lister.Err()
|
||||
}
|
||||
|
||||
// getCacheKey returns a hashable struct identifying a unique user and request.
|
||||
func (s *Store) getCacheKey(apiOp *types.APIRequest, opts *listprocessor.ListOptions) (cacheKey, error) {
|
||||
user, ok := request.UserFrom(apiOp.Request.Context())
|
||||
if !ok {
|
||||
return cacheKey{}, fmt.Errorf("could not find user in request")
|
||||
}
|
||||
filters := ""
|
||||
for _, f := range opts.Filters {
|
||||
filters = filters + f.String()
|
||||
}
|
||||
return cacheKey{
|
||||
chunkSize: opts.ChunkSize,
|
||||
resume: opts.Resume,
|
||||
filters: filters,
|
||||
sort: opts.Sort.String(),
|
||||
pageSize: opts.Pagination.PageSize(),
|
||||
accessID: s.asl.AccessFor(user).ID,
|
||||
resourcePath: apiOp.Request.URL.Path,
|
||||
revision: opts.Revision,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Create creates a single object in the store.
|
||||
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
|
||||
target, err := s.getStore(apiOp, schema, "create", "")
|
||||
|
@ -1,6 +1,7 @@
|
||||
package partition
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@ -9,10 +10,13 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/rancher/apiserver/pkg/types"
|
||||
"github.com/rancher/steve/pkg/accesscontrol"
|
||||
"github.com/rancher/wrangler/pkg/schemas"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
)
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
@ -26,7 +30,7 @@ func TestList(t *testing.T) {
|
||||
{
|
||||
name: "basic",
|
||||
apiOps: []*types.APIRequest{
|
||||
newRequest(""),
|
||||
newRequest("", "user1"),
|
||||
},
|
||||
partitions: []Partition{
|
||||
mockPartition{
|
||||
@ -52,9 +56,9 @@ func TestList(t *testing.T) {
|
||||
{
|
||||
name: "limit and continue",
|
||||
apiOps: []*types.APIRequest{
|
||||
newRequest("limit=1"),
|
||||
newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("granny-smith"))))))),
|
||||
newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("crispin"))))))),
|
||||
newRequest("limit=1", "user1"),
|
||||
newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("granny-smith")))))), "user1"),
|
||||
newRequest(fmt.Sprintf("limit=1&continue=%s", base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`{"p":"all","c":"%s","l":1}`, base64.StdEncoding.EncodeToString([]byte("crispin")))))), "user1"),
|
||||
},
|
||||
partitions: []Partition{
|
||||
mockPartition{
|
||||
@ -96,7 +100,7 @@ func TestList(t *testing.T) {
|
||||
{
|
||||
name: "multi-partition",
|
||||
apiOps: []*types.APIRequest{
|
||||
newRequest(""),
|
||||
newRequest("", "user1"),
|
||||
},
|
||||
partitions: []Partition{
|
||||
mockPartition{
|
||||
@ -136,9 +140,9 @@ func TestList(t *testing.T) {
|
||||
{
|
||||
name: "multi-partition with limit and continue",
|
||||
apiOps: []*types.APIRequest{
|
||||
newRequest("limit=3"),
|
||||
newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"green","o":1,"l":3}`)))),
|
||||
newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"red","l":3}`)))),
|
||||
newRequest("limit=3", "user1"),
|
||||
newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"green","o":1,"l":3}`))), "user1"),
|
||||
newRequest(fmt.Sprintf("limit=3&continue=%s", base64.StdEncoding.EncodeToString([]byte(`{"p":"red","l":3}`))), "user1"),
|
||||
},
|
||||
partitions: []Partition{
|
||||
mockPartition{
|
||||
@ -209,8 +213,8 @@ func TestList(t *testing.T) {
|
||||
{
|
||||
name: "with filters",
|
||||
apiOps: []*types.APIRequest{
|
||||
newRequest("filter=data.color=green"),
|
||||
newRequest("filter=data.color=green&filter=metadata.name=bramley"),
|
||||
newRequest("filter=data.color=green", "user1"),
|
||||
newRequest("filter=data.color=green&filter=metadata.name=bramley", "user1"),
|
||||
},
|
||||
partitions: []Partition{
|
||||
mockPartition{
|
||||
@ -246,7 +250,7 @@ func TestList(t *testing.T) {
|
||||
{
|
||||
name: "multi-partition with filters",
|
||||
apiOps: []*types.APIRequest{
|
||||
newRequest("filter=data.category=baking"),
|
||||
newRequest("filter=data.category=baking", "user1"),
|
||||
},
|
||||
partitions: []Partition{
|
||||
mockPartition{
|
||||
@ -292,8 +296,8 @@ func TestList(t *testing.T) {
|
||||
{
|
||||
name: "with sorting",
|
||||
apiOps: []*types.APIRequest{
|
||||
newRequest("sort=metadata.name"),
|
||||
newRequest("sort=-metadata.name"),
|
||||
newRequest("sort=metadata.name", "user1"),
|
||||
newRequest("sort=-metadata.name", "user1"),
|
||||
},
|
||||
partitions: []Partition{
|
||||
mockPartition{
|
||||
@ -334,7 +338,7 @@ func TestList(t *testing.T) {
|
||||
{
|
||||
name: "multi-partition sort=metadata.name",
|
||||
apiOps: []*types.APIRequest{
|
||||
newRequest("sort=metadata.name"),
|
||||
newRequest("sort=metadata.name", "user1"),
|
||||
},
|
||||
partitions: []Partition{
|
||||
mockPartition{
|
||||
@ -381,12 +385,11 @@ func TestList(t *testing.T) {
|
||||
contents: test.objects[p.Name()],
|
||||
}
|
||||
}
|
||||
store := Store{
|
||||
Partitioner: mockPartitioner{
|
||||
stores: stores,
|
||||
partitions: test.partitions,
|
||||
},
|
||||
}
|
||||
asl := &mockAccessSetLookup{}
|
||||
store := NewStore(mockPartitioner{
|
||||
stores: stores,
|
||||
partitions: test.partitions,
|
||||
}, asl)
|
||||
for i, req := range test.apiOps {
|
||||
got, gotErr := store.List(req, schema)
|
||||
assert.Nil(t, gotErr)
|
||||
@ -488,16 +491,19 @@ var colorMap = map[string]string{
|
||||
"red-delicious": "red",
|
||||
}
|
||||
|
||||
func newRequest(query string) *types.APIRequest {
|
||||
func newRequest(query, username string) *types.APIRequest {
|
||||
return &types.APIRequest{
|
||||
Request: &http.Request{
|
||||
Request: (&http.Request{
|
||||
URL: &url.URL{
|
||||
Scheme: "https",
|
||||
Host: "rancher",
|
||||
Path: "/apples",
|
||||
RawQuery: query,
|
||||
},
|
||||
},
|
||||
}).WithContext(request.WithUser(context.Background(), &user.DefaultInfo{
|
||||
Name: username,
|
||||
Groups: []string{"system:authenticated"},
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
@ -533,3 +539,15 @@ func (a apple) with(data map[string]string) apple {
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
type mockAccessSetLookup struct{}
|
||||
|
||||
func (m *mockAccessSetLookup) AccessFor(_ user.Info) *accesscontrol.AccessSet {
|
||||
return &accesscontrol.AccessSet{
|
||||
ID: "aabbccdd",
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockAccessSetLookup) PurgeUserData(_ string) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
@ -75,14 +75,15 @@ type Store struct {
|
||||
func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store {
|
||||
return &errorStore{
|
||||
Store: &WatchRefresh{
|
||||
Store: &partition.Store{
|
||||
Partitioner: &rbacPartitioner{
|
||||
Store: partition.NewStore(
|
||||
&rbacPartitioner{
|
||||
proxyStore: &Store{
|
||||
clientGetter: clientGetter,
|
||||
notifier: notifier,
|
||||
},
|
||||
},
|
||||
},
|
||||
lookup,
|
||||
),
|
||||
asl: lookup,
|
||||
},
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user