mirror of
https://github.com/rancher/steve.git
synced 2025-04-27 19:05:09 +00:00
Retrieve exact revision regardless of cache state
Without this change, the cache is checked if a revision is specified, but the revision is ignored if the value is not found in the cache. This is a problem if by chance sequential paginated requests land on separate pods, because a different version number may be fetched on the sequent request and cause the caches to be inconsistent with one another. This change ensures that `revision` is passed as `resourceVersion` to the Kubernetes request, since `revision` has no meaning to Kubernetes.
This commit is contained in:
parent
7c0228e575
commit
fa7fb37245
@ -72,7 +72,7 @@ func indexOrZero(partitions []Partition, name string) int {
|
||||
// List returns a stream of objects up to the requested limit.
|
||||
// If the continue token is not empty, it decodes it and returns the stream
|
||||
// starting at the indicated marker.
|
||||
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []unstructured.Unstructured, error) {
|
||||
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume, revision string) (<-chan []unstructured.Unstructured, error) {
|
||||
var state listState
|
||||
if resume != "" {
|
||||
bytes, err := base64.StdEncoding.DecodeString(resume)
|
||||
@ -86,6 +86,8 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st
|
||||
if state.Limit > 0 {
|
||||
limit = state.Limit
|
||||
}
|
||||
} else {
|
||||
state.Revision = revision
|
||||
}
|
||||
|
||||
result := make(chan []unstructured.Unstructured)
|
||||
|
@ -136,7 +136,10 @@ func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, sche
|
||||
|
||||
values := req.Request.URL.Query()
|
||||
values.Set("continue", cont)
|
||||
values.Set("revision", revision)
|
||||
if revision != "" && cont == "" {
|
||||
values.Set("resourceVersion", revision)
|
||||
values.Set("resourceVersionMatch", "Exact") // supported since k8s 1.19
|
||||
}
|
||||
if limit > 0 {
|
||||
values.Set("limit", strconv.Itoa(limit))
|
||||
} else {
|
||||
@ -184,11 +187,17 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
|
||||
}
|
||||
}
|
||||
if list == nil { // did not look in cache or was not found in cache
|
||||
stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume)
|
||||
stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume, opts.Revision)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
list = listprocessor.FilterList(stream, opts.Filters)
|
||||
// Check for any errors returned during the parallel listing requests.
|
||||
// We don't want to cache the list or bother with further processing if the list is empty or corrupt.
|
||||
// FilterList guarantees that the stream has been consumed and the error is populated if there is any.
|
||||
if lister.Err() != nil {
|
||||
return result, lister.Err()
|
||||
}
|
||||
list = listprocessor.SortList(list, opts.Sort)
|
||||
key.revision = lister.Revision()
|
||||
listToCache := &unstructured.UnstructuredList{
|
||||
|
@ -1743,7 +1743,7 @@ func TestList(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
schema := &types.APISchema{Schema: &schemas.Schema{ID: "apple"}}
|
||||
stores := map[string]*mockStore{}
|
||||
stores := map[string]UnstructuredStore{}
|
||||
for _, partitions := range test.partitions {
|
||||
for _, p := range partitions {
|
||||
stores[p.Name()] = &mockStore{
|
||||
@ -1775,7 +1775,7 @@ func TestList(t *testing.T) {
|
||||
}
|
||||
if len(test.wantListCalls) > 0 {
|
||||
for name, _ := range store.Partitioner.(mockPartitioner).stores {
|
||||
assert.Equal(t, test.wantListCalls[i][name], store.Partitioner.(mockPartitioner).stores[name].called)
|
||||
assert.Equal(t, test.wantListCalls[i][name], store.Partitioner.(mockPartitioner).stores[name].(*mockStore).called)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1783,8 +1783,74 @@ func TestList(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListByRevision(t *testing.T) {
|
||||
|
||||
schema := &types.APISchema{Schema: &schemas.Schema{ID: "apple"}}
|
||||
asl := &mockAccessSetLookup{userRoles: []map[string]string{
|
||||
{
|
||||
"user1": "roleA",
|
||||
},
|
||||
{
|
||||
"user1": "roleA",
|
||||
},
|
||||
}}
|
||||
store := NewStore(mockPartitioner{
|
||||
stores: map[string]UnstructuredStore{
|
||||
"all": &mockVersionedStore{
|
||||
versions: []mockStore{
|
||||
{
|
||||
contents: &unstructured.UnstructuredList{
|
||||
Object: map[string]interface{}{
|
||||
"metadata": map[string]interface{}{
|
||||
"resourceVersion": "1",
|
||||
},
|
||||
},
|
||||
Items: []unstructured.Unstructured{
|
||||
newApple("fuji").Unstructured,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
contents: &unstructured.UnstructuredList{
|
||||
Object: map[string]interface{}{
|
||||
"metadata": map[string]interface{}{
|
||||
"resourceVersion": "2",
|
||||
},
|
||||
},
|
||||
Items: []unstructured.Unstructured{
|
||||
newApple("fuji").Unstructured,
|
||||
newApple("granny-smith").Unstructured,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
partitions: map[string][]Partition{
|
||||
"user1": {
|
||||
mockPartition{
|
||||
name: "all",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, asl)
|
||||
req := newRequest("", "user1")
|
||||
t.Setenv("CATTLE_REQUEST_CACHE_DISABLED", "Y")
|
||||
|
||||
got, gotErr := store.List(req, schema)
|
||||
assert.Nil(t, gotErr)
|
||||
wantVersion := "2"
|
||||
assert.Equal(t, wantVersion, got.Revision)
|
||||
|
||||
req = newRequest("revision=1", "user1")
|
||||
got, gotErr = store.List(req, schema)
|
||||
assert.Nil(t, gotErr)
|
||||
wantVersion = "1"
|
||||
assert.Equal(t, wantVersion, got.Revision)
|
||||
}
|
||||
|
||||
type mockPartitioner struct {
|
||||
stores map[string]*mockStore
|
||||
stores map[string]UnstructuredStore
|
||||
partitions map[string][]Partition
|
||||
}
|
||||
|
||||
@ -1868,6 +1934,49 @@ func (m *mockStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, w ty
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
type mockVersionedStore struct {
|
||||
mockStore
|
||||
versions []mockStore
|
||||
}
|
||||
|
||||
func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
|
||||
m.called++
|
||||
query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery)
|
||||
rv := len(m.versions) - 1
|
||||
if query.Get("resourceVersion") != "" {
|
||||
rv, _ = strconv.Atoi(query.Get("resourceVersion"))
|
||||
rv--
|
||||
}
|
||||
l := query.Get("limit")
|
||||
if l == "" {
|
||||
return m.versions[rv].contents, nil
|
||||
}
|
||||
i := 0
|
||||
if c := query.Get("continue"); c != "" {
|
||||
start, _ := base64.StdEncoding.DecodeString(c)
|
||||
for j, obj := range m.versions[rv].contents.Items {
|
||||
if string(start) == obj.GetName() {
|
||||
i = j
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
lInt, _ := strconv.Atoi(l)
|
||||
contents := m.versions[rv].contents.DeepCopy()
|
||||
if len(contents.Items) > i+lInt {
|
||||
contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName())))
|
||||
}
|
||||
if i > len(contents.Items) {
|
||||
return contents, nil
|
||||
}
|
||||
if i+lInt > len(contents.Items) {
|
||||
contents.Items = contents.Items[i:]
|
||||
return contents, nil
|
||||
}
|
||||
contents.Items = contents.Items[i : i+lInt]
|
||||
return contents, nil
|
||||
}
|
||||
|
||||
type mockCache struct {
|
||||
contents map[cacheKey]*unstructured.UnstructuredList
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user