diff --git a/pkg/stores/partition/parallel.go b/pkg/stores/partition/parallel.go index 03c8089..35403aa 100644 --- a/pkg/stores/partition/parallel.go +++ b/pkg/stores/partition/parallel.go @@ -72,7 +72,7 @@ func indexOrZero(partitions []Partition, name string) int { // List returns a stream of objects up to the requested limit. // If the continue token is not empty, it decodes it and returns the stream // starting at the indicated marker. -func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []unstructured.Unstructured, error) { +func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume, revision string) (<-chan []unstructured.Unstructured, error) { var state listState if resume != "" { bytes, err := base64.StdEncoding.DecodeString(resume) @@ -86,6 +86,8 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st if state.Limit > 0 { limit = state.Limit } + } else { + state.Revision = revision } result := make(chan []unstructured.Unstructured) diff --git a/pkg/stores/partition/store.go b/pkg/stores/partition/store.go index ca9c9c7..1efbda7 100644 --- a/pkg/stores/partition/store.go +++ b/pkg/stores/partition/store.go @@ -136,7 +136,10 @@ func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, sche values := req.Request.URL.Query() values.Set("continue", cont) - values.Set("revision", revision) + if revision != "" && cont == "" { + values.Set("resourceVersion", revision) + values.Set("resourceVersionMatch", "Exact") // supported since k8s 1.19 + } if limit > 0 { values.Set("limit", strconv.Itoa(limit)) } else { @@ -184,11 +187,17 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP } } if list == nil { // did not look in cache or was not found in cache - stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume) + stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume, opts.Revision) if err != nil { return result, err } list = listprocessor.FilterList(stream, opts.Filters) + // Check for any errors returned during the parallel listing requests. + // We don't want to cache the list or bother with further processing if the list is empty or corrupt. + // FilterList guarantees that the stream has been consumed and the error is populated if there is any. + if lister.Err() != nil { + return result, lister.Err() + } list = listprocessor.SortList(list, opts.Sort) key.revision = lister.Revision() listToCache := &unstructured.UnstructuredList{ diff --git a/pkg/stores/partition/store_test.go b/pkg/stores/partition/store_test.go index c0b2b46..42c8b27 100644 --- a/pkg/stores/partition/store_test.go +++ b/pkg/stores/partition/store_test.go @@ -1743,7 +1743,7 @@ func TestList(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { schema := &types.APISchema{Schema: &schemas.Schema{ID: "apple"}} - stores := map[string]*mockStore{} + stores := map[string]UnstructuredStore{} for _, partitions := range test.partitions { for _, p := range partitions { stores[p.Name()] = &mockStore{ @@ -1775,7 +1775,7 @@ func TestList(t *testing.T) { } if len(test.wantListCalls) > 0 { for name, _ := range store.Partitioner.(mockPartitioner).stores { - assert.Equal(t, test.wantListCalls[i][name], store.Partitioner.(mockPartitioner).stores[name].called) + assert.Equal(t, test.wantListCalls[i][name], store.Partitioner.(mockPartitioner).stores[name].(*mockStore).called) } } } @@ -1783,8 +1783,74 @@ func TestList(t *testing.T) { } } +func TestListByRevision(t *testing.T) { + + schema := &types.APISchema{Schema: &schemas.Schema{ID: "apple"}} + asl := &mockAccessSetLookup{userRoles: []map[string]string{ + { + "user1": "roleA", + }, + { + "user1": "roleA", + }, + }} + store := NewStore(mockPartitioner{ + stores: map[string]UnstructuredStore{ + "all": &mockVersionedStore{ + versions: []mockStore{ + { + contents: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "1", + }, + }, + Items: []unstructured.Unstructured{ + newApple("fuji").Unstructured, + }, + }, + }, + { + contents: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "metadata": map[string]interface{}{ + "resourceVersion": "2", + }, + }, + Items: []unstructured.Unstructured{ + newApple("fuji").Unstructured, + newApple("granny-smith").Unstructured, + }, + }, + }, + }, + }, + }, + partitions: map[string][]Partition{ + "user1": { + mockPartition{ + name: "all", + }, + }, + }, + }, asl) + req := newRequest("", "user1") + t.Setenv("CATTLE_REQUEST_CACHE_DISABLED", "Y") + + got, gotErr := store.List(req, schema) + assert.Nil(t, gotErr) + wantVersion := "2" + assert.Equal(t, wantVersion, got.Revision) + + req = newRequest("revision=1", "user1") + got, gotErr = store.List(req, schema) + assert.Nil(t, gotErr) + wantVersion = "1" + assert.Equal(t, wantVersion, got.Revision) +} + type mockPartitioner struct { - stores map[string]*mockStore + stores map[string]UnstructuredStore partitions map[string][]Partition } @@ -1868,6 +1934,49 @@ func (m *mockStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, w ty panic("not implemented") } +type mockVersionedStore struct { + mockStore + versions []mockStore +} + +func (m *mockVersionedStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) { + m.called++ + query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery) + rv := len(m.versions) - 1 + if query.Get("resourceVersion") != "" { + rv, _ = strconv.Atoi(query.Get("resourceVersion")) + rv-- + } + l := query.Get("limit") + if l == "" { + return m.versions[rv].contents, nil + } + i := 0 + if c := query.Get("continue"); c != "" { + start, _ := base64.StdEncoding.DecodeString(c) + for j, obj := range m.versions[rv].contents.Items { + if string(start) == obj.GetName() { + i = j + break + } + } + } + lInt, _ := strconv.Atoi(l) + contents := m.versions[rv].contents.DeepCopy() + if len(contents.Items) > i+lInt { + contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName()))) + } + if i > len(contents.Items) { + return contents, nil + } + if i+lInt > len(contents.Items) { + contents.Items = contents.Items[i:] + return contents, nil + } + contents.Items = contents.Items[i : i+lInt] + return contents, nil +} + type mockCache struct { contents map[cacheKey]*unstructured.UnstructuredList }