diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index f3b50611..1cb262fc 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -6,6 +6,7 @@ package informer import ( "context" + "sort" "time" "github.com/rancher/steve/pkg/sqlcache/db" @@ -31,7 +32,8 @@ type Informer struct { } type WatchOptions struct { - Filter WatchFilter + ResourceVersion string + Filter WatchFilter } type WatchFilter struct { @@ -63,6 +65,10 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [ listWatcher := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { a, err := client.List(ctx, options) + // We want the list to be consistent when there are going to be relists + sort.SliceStable(a.Items, func(i int, j int) bool { + return a.Items[i].GetResourceVersion() < a.Items[j].GetResourceVersion() + }) return a, err }, WatchFunc: watchFunc, diff --git a/pkg/sqlcache/informer/informer_test.go b/pkg/sqlcache/informer/informer_test.go index b030e1d4..c152dbc6 100644 --- a/pkg/sqlcache/informer/informer_test.go +++ b/pkg/sqlcache/informer/informer_test.go @@ -43,6 +43,7 @@ func TestNewInformer(t *testing.T) { // is tested in depth in its own package. txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) + txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) dbClient.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(nil).Do( func(ctx context.Context, shouldEncrypt bool, f db.WithTransactionFunction) { err := f(txClient) @@ -154,6 +155,7 @@ func TestNewInformer(t *testing.T) { // is tested in depth in its own package. txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) + txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) dbClient.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(nil).Do( func(ctx context.Context, shouldEncrypt bool, f db.WithTransactionFunction) { err := f(txClient) @@ -214,6 +216,7 @@ func TestNewInformer(t *testing.T) { // is tested in depth in its own package. txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) + txClient.EXPECT().Exec(gomock.Any()).Return(nil, nil) dbClient.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(nil).Do( func(ctx context.Context, shouldEncrypt bool, f db.WithTransactionFunction) { err := f(txClient) diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index ba4c4eb1..4d587a99 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -1,11 +1,13 @@ package informer import ( + "bytes" "context" "database/sql" "encoding/gob" "errors" "fmt" + "reflect" "regexp" "sort" "strconv" @@ -33,9 +35,15 @@ type ListOptionIndexer struct { namespaced bool indexedFields []string + latestRVLock sync.RWMutex + latestRV string + watchersLock sync.RWMutex watchers map[*watchKey]*watcher + upsertEventsQuery string + findEventsRowByRVQuery string + listEventsAfterQuery string addFieldsQuery string deleteFieldsByKeyQuery string deleteFieldsQuery string @@ -43,6 +51,9 @@ type ListOptionIndexer struct { deleteLabelsByKeyQuery string deleteLabelsQuery string + upsertEventsStmt *sql.Stmt + findEventsRowByRVStmt *sql.Stmt + listEventsAfterStmt *sql.Stmt addFieldsStmt *sql.Stmt deleteFieldsByKeyStmt *sql.Stmt deleteFieldsStmt *sql.Stmt @@ -63,7 +74,24 @@ const ( matchFmt = `%%%s%%` strictMatchFmt = `%s` escapeBackslashDirective = ` ESCAPE '\'` // The leading space is crucial for unit tests only ' - createFieldsTableFmt = `CREATE TABLE "%s_fields" ( + + // RV stands for ResourceVersion + createEventsTableFmt = `CREATE TABLE "%s_events" ( + rv TEXT NOT NULL, + type TEXT NOT NULL, + event BLOB NOT NULL, + PRIMARY KEY (type, rv) + )` + listEventsAfterFmt = `SELECT type, rv, event + FROM "%s_events" + WHERE rowid > ? + ` + findEventsRowByRVFmt = `SELECT rowid + FROM "%s_events" + WHERE rv = ? + ` + + createFieldsTableFmt = `CREATE TABLE "%s_fields" ( key TEXT NOT NULL PRIMARY KEY, %s )` @@ -138,6 +166,12 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names setStatements := make([]string, len(indexedFields)) err = l.WithTransaction(ctx, true, func(tx transaction.Client) error { + createEventsTableQuery := fmt.Sprintf(createEventsTableFmt, dbName) + _, err = tx.Exec(createEventsTableQuery) + if err != nil { + return &db.QueryError{QueryString: createEventsTableFmt, Err: err} + } + _, err = tx.Exec(fmt.Sprintf(createFieldsTableFmt, dbName, strings.Join(columnDefs, ", "))) if err != nil { return err @@ -179,6 +213,18 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names return nil, err } + l.upsertEventsQuery = fmt.Sprintf( + `REPLACE INTO "%s_events"(rv, type, event) VALUES (?, ?, ?)`, + dbName, + ) + l.upsertEventsStmt = l.Prepare(l.upsertEventsQuery) + + l.listEventsAfterQuery = fmt.Sprintf(listEventsAfterFmt, dbName) + l.listEventsAfterStmt = l.Prepare(l.listEventsAfterQuery) + + l.findEventsRowByRVQuery = fmt.Sprintf(findEventsRowByRVFmt, dbName) + l.findEventsRowByRVStmt = l.Prepare(l.findEventsRowByRVQuery) + l.addFieldsQuery = fmt.Sprintf( `INSERT INTO "%s_fields"(key, %s) VALUES (?, %s) ON CONFLICT DO UPDATE SET %s`, dbName, @@ -204,10 +250,95 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names } func (l *ListOptionIndexer) Watch(ctx context.Context, opts WatchOptions, eventsCh chan<- watch.Event) error { - key := l.addWatcher(eventsCh, opts.Filter) + l.latestRVLock.RLock() + latestRV := l.latestRV + l.latestRVLock.RUnlock() + + targetRV := opts.ResourceVersion + if opts.ResourceVersion == "" { + targetRV = latestRV + } + + var events []watch.Event + var key *watchKey + // Even though we're not writing in this transaction, we prevent other writes to SQL + // because we don't want to add more events while we're backfilling events, so we don't miss events + err := l.WithTransaction(ctx, true, func(tx transaction.Client) error { + rowIDRows, err := tx.Stmt(l.findEventsRowByRVStmt).QueryContext(ctx, targetRV) + if err != nil { + return &db.QueryError{QueryString: l.listEventsAfterQuery, Err: err} + } + if !rowIDRows.Next() && targetRV != latestRV { + return fmt.Errorf("resourceversion too old") + } + + var rowID int + rowIDRows.Scan(&rowID) + + // Backfilling previous events from resourceVersion + rows, err := tx.Stmt(l.listEventsAfterStmt).QueryContext(ctx, rowID) + if err != nil { + return &db.QueryError{QueryString: l.listEventsAfterQuery, Err: err} + } + + for rows.Next() { + var typ, rv string + var buf sql.RawBytes + err := rows.Scan(&typ, &rv, &buf) + if err != nil { + return fmt.Errorf("scanning event row: %w", err) + } + + example := &unstructured.Unstructured{} + val, err := fromBytes(buf, reflect.TypeOf(example)) + if err != nil { + return fmt.Errorf("decoding event object: %w", err) + } + + obj, ok := val.Elem().Interface().(runtime.Object) + if !ok { + continue + } + + filter := opts.Filter + if !matchFilter(filter.ID, filter.Namespace, filter.Selector, obj) { + continue + } + + events = append(events, watch.Event{ + Type: watch.EventType(typ), + Object: val.Elem().Interface().(runtime.Object), + }) + } + + for _, event := range events { + eventsCh <- event + } + + key = l.addWatcher(eventsCh, opts.Filter) + return nil + }) <-ctx.Done() l.removeWatcher(key) - return nil + return err +} + +func toBytes(obj any) []byte { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(obj) + if err != nil { + panic(fmt.Errorf("error while gobbing object: %w", err)) + } + bb := buf.Bytes() + return bb +} + +func fromBytes(buf sql.RawBytes, typ reflect.Type) (reflect.Value, error) { + dec := gob.NewDecoder(bytes.NewReader(buf)) + singleResult := reflect.New(typ) + err := dec.DecodeValue(singleResult) + return singleResult, err } type watchKey struct { @@ -268,6 +399,24 @@ func (l *ListOptionIndexer) notifyEventDeleted(key string, obj any, tx transacti } func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, obj any, tx transaction.Client) error { + acc, err := meta.Accessor(obj) + if err != nil { + return err + } + + latestRV := acc.GetResourceVersion() + // Append a -d suffix because the RV might be the same as the previous object + // in the following case: + // - Add obj1 with RV 100 + // - Delete obj1 with RV 100 + if eventType == watch.Deleted { + latestRV = latestRV + "-d" + } + _, err = tx.Stmt(l.upsertEventsStmt).Exec(latestRV, eventType, toBytes(obj)) + if err != nil { + return &db.QueryError{QueryString: l.upsertEventsQuery, Err: err} + } + l.watchersLock.RLock() for _, watcher := range l.watchers { if !matchWatch(watcher.filter.ID, watcher.filter.Namespace, watcher.filter.Selector, oldObj, obj) { @@ -280,6 +429,10 @@ func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, o } } l.watchersLock.RUnlock() + + l.latestRVLock.Lock() + defer l.latestRVLock.Unlock() + l.latestRV = latestRV return nil } @@ -634,7 +787,11 @@ func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryIn continueToken = fmt.Sprintf("%d", offset+limit) } - return toUnstructuredList(items), total, continueToken, nil + l.latestRVLock.RLock() + latestRV := l.latestRV + l.latestRVLock.RUnlock() + + return toUnstructuredList(items, latestRV), total, continueToken, nil } func (l *ListOptionIndexer) validateColumn(column string) error { @@ -1063,12 +1220,15 @@ func isLabelsFieldList(fields []string) bool { } // toUnstructuredList turns a slice of unstructured objects into an unstructured.UnstructuredList -func toUnstructuredList(items []any) *unstructured.UnstructuredList { +func toUnstructuredList(items []any, resourceVersion string) *unstructured.UnstructuredList { objectItems := make([]any, len(items)) result := &unstructured.UnstructuredList{ Items: make([]unstructured.Unstructured, len(items)), Object: map[string]interface{}{"items": objectItems}, } + if resourceVersion != "" { + result.SetResourceVersion(resourceVersion) + } for i, item := range items { result.Items[i] = *item.(*unstructured.Unstructured) objectItems[i] = item.(*unstructured.Unstructured).Object diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index f087f2aa..d645a635 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -96,6 +96,8 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + // create events table + txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) // create field table txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) // create field table indexes @@ -196,6 +198,7 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsIndexFmt, id, "metadata.name", id, "metadata.name")).Return(nil, fmt.Errorf("error")) store.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(fmt.Errorf("error")).Do( @@ -236,6 +239,7 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsIndexFmt, id, "metadata.name", id, "metadata.name")).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsIndexFmt, id, "metadata.namespace", id, "metadata.namespace")).Return(nil, nil) @@ -280,6 +284,7 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsIndexFmt, id, "metadata.name", id, "metadata.name")).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsIndexFmt, id, "metadata.namespace", id, "metadata.namespace")).Return(nil, nil) @@ -1869,10 +1874,16 @@ func TestWatchMany(t *testing.T) { }, }, } - fooUpdated := foo.DeepCopy() - fooUpdated.SetLabels(map[string]string{ + foo.SetResourceVersion("100") + foo2 := foo.DeepCopy() + foo2.SetResourceVersion("120") + foo2.SetLabels(map[string]string{ "hello": "world", }) + foo3 := foo.DeepCopy() + foo3.SetResourceVersion("140") + foo4 := foo2.DeepCopy() + foo4.SetResourceVersion("160") err = loi.Add(foo) assert.NoError(t, err) @@ -1883,14 +1894,14 @@ func TestWatchMany(t *testing.T) { ctx2, cancel2 := context.WithCancel(context.Background()) watcher2, errCh2 := startWatcher(ctx2) - err = loi.Update(fooUpdated) + err = loi.Update(foo2) assert.NoError(t, err) events = receiveEvents(watcher1) - assert.Equal(t, []watch.Event{{Type: watch.Modified, Object: fooUpdated}}, events) + assert.Equal(t, []watch.Event{{Type: watch.Modified, Object: foo2}}, events) events = receiveEvents(watcher2) - assert.Equal(t, []watch.Event{{Type: watch.Modified, Object: fooUpdated}}, events) + assert.Equal(t, []watch.Event{{Type: watch.Modified, Object: foo2}}, events) watcher3, errCh3 := startWatcher(ctx) @@ -1898,18 +1909,18 @@ func TestWatchMany(t *testing.T) { err = waitStopWatcher(errCh2) assert.NoError(t, err) - err = loi.Delete(fooUpdated) + err = loi.Delete(foo2) assert.NoError(t, err) - err = loi.Add(foo) + err = loi.Add(foo3) assert.NoError(t, err) - err = loi.Update(fooUpdated) + err = loi.Update(foo4) assert.NoError(t, err) events = receiveEvents(watcher3) assert.Equal(t, []watch.Event{ - {Type: watch.Deleted, Object: fooUpdated}, - {Type: watch.Added, Object: foo}, - {Type: watch.Modified, Object: fooUpdated}, + {Type: watch.Deleted, Object: foo2}, + {Type: watch.Added, Object: foo3}, + {Type: watch.Modified, Object: foo4}, }, events) // Verify cancelled watcher don't receive anything anymore @@ -1918,9 +1929,9 @@ func TestWatchMany(t *testing.T) { events = receiveEvents(watcher1) assert.Equal(t, []watch.Event{ - {Type: watch.Deleted, Object: fooUpdated}, - {Type: watch.Added, Object: foo}, - {Type: watch.Modified, Object: fooUpdated}, + {Type: watch.Deleted, Object: foo2}, + {Type: watch.Added, Object: foo3}, + {Type: watch.Modified, Object: foo4}, }, events) cancel() @@ -2088,3 +2099,157 @@ func TestWatchFilter(t *testing.T) { } } + +func TestWatchResourceVersion(t *testing.T) { + startWatcher := func(ctx context.Context, loi *ListOptionIndexer, rv string) (chan watch.Event, chan error) { + errCh := make(chan error, 1) + eventsCh := make(chan watch.Event, 100) + go func() { + watchErr := loi.Watch(ctx, WatchOptions{ResourceVersion: rv}, eventsCh) + errCh <- watchErr + }() + time.Sleep(100 * time.Millisecond) + return eventsCh, errCh + } + + waitStopWatcher := func(errCh chan error) error { + select { + case <-time.After(time.Second * 5): + return fmt.Errorf("not finished in time") + case err := <-errCh: + return err + } + } + + receiveEvents := func(eventsCh chan watch.Event) []watch.Event { + timer := time.NewTimer(time.Millisecond * 50) + var events []watch.Event + for { + select { + case <-timer.C: + return events + case ev := <-eventsCh: + events = append(events, ev) + } + } + } + + foo := &unstructured.Unstructured{} + foo.SetResourceVersion("100") + foo.SetName("foo") + foo.SetNamespace("foo") + foo.SetLabels(map[string]string{ + "app": "foo", + }) + + fooUpdated := foo.DeepCopy() + fooUpdated.SetResourceVersion("120") + fooUpdated.SetLabels(map[string]string{ + "app": "changed", + }) + + bar := &unstructured.Unstructured{} + bar.SetResourceVersion("150") + bar.SetName("bar") + bar.SetNamespace("bar") + bar.SetLabels(map[string]string{ + "app": "bar", + }) + + barNew := &unstructured.Unstructured{} + barNew.SetResourceVersion("160") + barNew.SetName("bar") + barNew.SetNamespace("bar") + barNew.SetLabels(map[string]string{ + "app": "bar", + }) + + parentCtx := context.Background() + + loi, err := makeListOptionIndexer(parentCtx, [][]string{}) + assert.NoError(t, err) + + getRV := func(t *testing.T) string { + t.Helper() + list, _, _, err := loi.ListByOptions(parentCtx, &sqltypes.ListOptions{}, []partition.Partition{{All: true}}, "") + assert.NoError(t, err) + return list.GetResourceVersion() + } + + err = loi.Add(foo) + assert.NoError(t, err) + rv1 := getRV(t) + + err = loi.Update(fooUpdated) + assert.NoError(t, err) + rv2 := getRV(t) + + err = loi.Add(bar) + assert.NoError(t, err) + rv3 := getRV(t) + + err = loi.Delete(bar) + assert.NoError(t, err) + rv4 := getRV(t) + + err = loi.Add(barNew) + assert.NoError(t, err) + rv5 := getRV(t) + + tests := []struct { + rv string + expectedEvents []watch.Event + }{ + { + rv: "", + }, + { + rv: rv1, + expectedEvents: []watch.Event{ + {Type: watch.Modified, Object: fooUpdated}, + {Type: watch.Added, Object: bar}, + {Type: watch.Deleted, Object: bar}, + {Type: watch.Added, Object: barNew}, + }, + }, + { + rv: rv2, + expectedEvents: []watch.Event{ + {Type: watch.Added, Object: bar}, + {Type: watch.Deleted, Object: bar}, + {Type: watch.Added, Object: barNew}, + }, + }, + { + rv: rv3, + expectedEvents: []watch.Event{ + {Type: watch.Deleted, Object: bar}, + {Type: watch.Added, Object: barNew}, + }, + }, + { + rv: rv4, + expectedEvents: []watch.Event{ + {Type: watch.Added, Object: barNew}, + }, + }, + { + rv: rv5, + expectedEvents: nil, + }, + } + + for _, test := range tests { + t.Run(test.rv, func(t *testing.T) { + ctx, cancel := context.WithCancel(parentCtx) + watcherCh, errCh := startWatcher(ctx, loi, test.rv) + gotEvents := receiveEvents(watcherCh) + + assert.Equal(t, test.expectedEvents, gotEvents) + + cancel() + err := waitStopWatcher(errCh) + assert.NoError(t, err) + }) + } +} diff --git a/pkg/stores/sqlpartition/partition_mocks_test.go b/pkg/stores/sqlpartition/partition_mocks_test.go index 687b9006..ac8c9b33 100644 --- a/pkg/stores/sqlpartition/partition_mocks_test.go +++ b/pkg/stores/sqlpartition/partition_mocks_test.go @@ -143,10 +143,10 @@ func (mr *MockUnstructuredStoreMockRecorder) Delete(arg0, arg1, arg2 any) *gomoc } // ListByPartitions mocks base method. -func (m *MockUnstructuredStore) ListByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 []partition.Partition) ([]unstructured.Unstructured, int, string, error) { +func (m *MockUnstructuredStore) ListByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 []partition.Partition) (*unstructured.UnstructuredList, int, string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListByPartitions", arg0, arg1, arg2) - ret0, _ := ret[0].([]unstructured.Unstructured) + ret0, _ := ret[0].(*unstructured.UnstructuredList) ret1, _ := ret[1].(int) ret2, _ := ret[2].(string) ret3, _ := ret[3].(error) diff --git a/pkg/stores/sqlpartition/partitioner.go b/pkg/stores/sqlpartition/partitioner.go index b3b74f9b..a6f6a325 100644 --- a/pkg/stores/sqlpartition/partitioner.go +++ b/pkg/stores/sqlpartition/partitioner.go @@ -29,7 +29,7 @@ type UnstructuredStore interface { Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) - ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) ([]unstructured.Unstructured, int, string, error) + ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) (*unstructured.UnstructuredList, int, string, error) WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan watch.Event, error) } diff --git a/pkg/stores/sqlpartition/store.go b/pkg/stores/sqlpartition/store.go index f4ebb325..921d9654 100644 --- a/pkg/stores/sqlpartition/store.go +++ b/pkg/stores/sqlpartition/store.go @@ -92,7 +92,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP result.Count = total - for _, item := range list { + for _, item := range list.Items { item := item.DeepCopy() // the sql cache automatically adds the ID through a transformFunc. Because of this, we have a different set of reserved fields for the SQL cache result.Objects = append(result.Objects, partition.ToAPI(schema, item, nil, s.sqlReservedFields)) @@ -100,6 +100,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP result.Revision = "" result.Continue = continueToken + result.Revision = list.GetResourceVersion() return result, nil } diff --git a/pkg/stores/sqlpartition/store_test.go b/pkg/stores/sqlpartition/store_test.go index cb09b82f..e83557dc 100644 --- a/pkg/stores/sqlpartition/store_test.go +++ b/pkg/stores/sqlpartition/store_test.go @@ -51,16 +51,18 @@ func TestList(t *testing.T) { Schema: &schemas.Schema{}, } partitions := make([]partition.Partition, 0) - uListToReturn := []unstructured.Unstructured{ - { - Object: map[string]interface{}{ - "kind": "apple", - "metadata": map[string]interface{}{ - "name": "fuji", - "namespace": "fruitsnamespace", - }, - "data": map[string]interface{}{ - "color": "pink", + uListToReturn := &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "kind": "apple", + "metadata": map[string]interface{}{ + "name": "fuji", + "namespace": "fruitsnamespace", + }, + "data": map[string]interface{}{ + "color": "pink", + }, }, }, }, @@ -88,7 +90,7 @@ func TestList(t *testing.T) { } p.EXPECT().All(req, schema, "list", "").Return(partitions, nil) p.EXPECT().Store().Return(us) - us.EXPECT().ListByPartitions(req, schema, partitions).Return(uListToReturn, len(uListToReturn), "", nil) + us.EXPECT().ListByPartitions(req, schema, partitions).Return(uListToReturn, len(uListToReturn.Items), "", nil) l, err := s.List(req, schema) assert.Nil(t, err) assert.Equal(t, expectedAPIObjList, l) diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index c2d4221a..8e071a41 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -569,6 +569,7 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types. } opts := informer.WatchOptions{ + ResourceVersion: w.Revision, Filter: informer.WatchFilter{ ID: w.ID, Namespace: idNamespace, @@ -737,7 +738,7 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri // - the total number of resources (returned list might be a subset depending on pagination options in apiOp) // - a continue token, if there are more pages after the returned one // - an error instead of all of the above if anything went wrong -func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) ([]unstructured.Unstructured, int, string, error) { +func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) (*unstructured.UnstructuredList, int, string, error) { opts, err := listprocessor.ParseQuery(apiOp, s.namespaceCache) if err != nil { return nil, 0, "", err @@ -768,7 +769,7 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchem return nil, 0, "", err } - return list.Items, total, continueToken, nil + return list, total, continueToken, nil } // WatchByPartitions returns a channel of events for a list or resource belonging to any of the specified partitions diff --git a/pkg/stores/sqlproxy/proxy_store_test.go b/pkg/stores/sqlproxy/proxy_store_test.go index 891f97b4..e4605e1f 100644 --- a/pkg/stores/sqlproxy/proxy_store_test.go +++ b/pkg/stores/sqlproxy/proxy_store_test.go @@ -249,7 +249,7 @@ func TestListByPartitions(t *testing.T) { bloi.EXPECT().ListByOptions(req.Context(), &opts, partitions, req.Namespace).Return(listToReturn, len(listToReturn.Items), "", nil) list, total, contToken, err := s.ListByPartitions(req, schema, partitions) assert.Nil(t, err) - assert.Equal(t, expectedItems, list) + assert.Equal(t, expectedItems, list.Items) assert.Equal(t, len(expectedItems), total) assert.Equal(t, "", contToken) }, @@ -466,7 +466,7 @@ func TestListByPartitions(t *testing.T) { bloi.EXPECT().ListByOptions(req.Context(), &opts, partitions, req.Namespace).Return(listToReturn, len(listToReturn.Items), "", nil) list, total, contToken, err := s.ListByPartitions(req, schema, partitions) assert.Nil(t, err) - assert.Equal(t, expectedItems, list) + assert.Equal(t, expectedItems, list.Items) assert.Equal(t, len(expectedItems), total) assert.Equal(t, "", contToken) },