From e3f207ddc27daef90a33e2fa4e22ebbd9468aeb0 Mon Sep 17 00:00:00 2001 From: Tom Lebreux Date: Tue, 3 Jun 2025 16:07:18 -0600 Subject: [PATCH] Add basic watch functionality for SQL cache (#653) * Remove unused method * Add basic watch functionality * Remove TestWatchNamesErrReceive test --- pkg/sqlcache/informer/informer.go | 4 + pkg/sqlcache/informer/informer_mocks_test.go | 15 ++ pkg/sqlcache/informer/listoption_indexer.go | 59 ++++++++ .../informer/listoption_indexer_test.go | 143 ++++++++++++++++-- pkg/stores/sqlproxy/proxy_store.go | 114 +++++--------- pkg/stores/sqlproxy/proxy_store_test.go | 74 +++++---- .../sqlproxy/sql_informer_mocks_test.go | 16 ++ 7 files changed, 292 insertions(+), 133 deletions(-) diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index dd162662..926828f7 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -29,8 +29,12 @@ type Informer struct { ByOptionsLister } +type WatchOptions struct { +} + type ByOptionsLister interface { ListByOptions(ctx context.Context, lo *sqltypes.ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) + Watch(ctx context.Context, opts WatchOptions, eventsCh chan<- watch.Event) error } // this is set to a var so that it can be overridden by test code for mocking purposes diff --git a/pkg/sqlcache/informer/informer_mocks_test.go b/pkg/sqlcache/informer/informer_mocks_test.go index 4cde0b67..38c23627 100644 --- a/pkg/sqlcache/informer/informer_mocks_test.go +++ b/pkg/sqlcache/informer/informer_mocks_test.go @@ -17,6 +17,7 @@ import ( sqltypes "github.com/rancher/steve/pkg/sqlcache/sqltypes" gomock "go.uber.org/mock/gomock" unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + watch "k8s.io/apimachinery/pkg/watch" ) // MockByOptionsLister is a mock of ByOptionsLister interface. @@ -58,3 +59,17 @@ func (mr *MockByOptionsListerMockRecorder) ListByOptions(arg0, arg1, arg2, arg3 mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByOptions", reflect.TypeOf((*MockByOptionsLister)(nil).ListByOptions), arg0, arg1, arg2, arg3) } + +// Watch mocks base method. +func (m *MockByOptionsLister) Watch(arg0 context.Context, arg1 WatchOptions, arg2 chan<- watch.Event) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Watch", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Watch indicates an expected call of Watch. +func (mr *MockByOptionsListerMockRecorder) Watch(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockByOptionsLister)(nil).Watch), arg0, arg1, arg2) +} diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index 8898cd51..14ce8c51 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -10,11 +10,14 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/rancher/steve/pkg/sqlcache/db/transaction" "github.com/rancher/steve/pkg/sqlcache/sqltypes" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "github.com/rancher/steve/pkg/sqlcache/db" @@ -28,6 +31,9 @@ type ListOptionIndexer struct { namespaced bool indexedFields []string + watchersLock sync.RWMutex + watchers map[*watchKey]chan<- watch.Event + addFieldsQuery string deleteFieldsByKeyQuery string deleteFieldsQuery string @@ -105,13 +111,17 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names Indexer: i, namespaced: namespaced, indexedFields: indexedFields, + watchers: make(map[*watchKey]chan<- watch.Event), } l.RegisterAfterAdd(l.addIndexFields) + l.RegisterAfterAdd(l.notifyEventAdded) l.RegisterAfterUpdate(l.addIndexFields) + l.RegisterAfterUpdate(l.notifyEventModified) l.RegisterAfterAdd(l.addLabels) l.RegisterAfterUpdate(l.addLabels) l.RegisterAfterDelete(l.deleteFieldsByKey) l.RegisterAfterDelete(l.deleteLabelsByKey) + l.RegisterAfterDelete(l.notifyEventDeleted) l.RegisterAfterDeleteAll(l.deleteFields) l.RegisterAfterDeleteAll(l.deleteLabels) columnDefs := make([]string, len(indexedFields)) @@ -191,8 +201,57 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names return l, nil } +func (l *ListOptionIndexer) Watch(ctx context.Context, opts WatchOptions, eventsCh chan<- watch.Event) error { + key := l.addWatcher(eventsCh) + <-ctx.Done() + l.removeWatcher(key) + return nil +} + +type watchKey struct { + _ bool // ensure watchKey is NOT zero-sized to get unique pointers +} + +func (l *ListOptionIndexer) addWatcher(eventCh chan<- watch.Event) *watchKey { + key := new(watchKey) + l.watchersLock.Lock() + l.watchers[key] = eventCh + l.watchersLock.Unlock() + return key +} + +func (l *ListOptionIndexer) removeWatcher(key *watchKey) { + l.watchersLock.Lock() + delete(l.watchers, key) + l.watchersLock.Unlock() +} + /* Core methods */ +func (l *ListOptionIndexer) notifyEventAdded(key string, obj any, tx transaction.Client) error { + return l.notifyEvent(watch.Added, obj, tx) +} + +func (l *ListOptionIndexer) notifyEventModified(key string, obj any, tx transaction.Client) error { + return l.notifyEvent(watch.Modified, obj, tx) +} + +func (l *ListOptionIndexer) notifyEventDeleted(key string, obj any, tx transaction.Client) error { + return l.notifyEvent(watch.Deleted, obj, tx) +} + +func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, obj any, tx transaction.Client) error { + l.watchersLock.RLock() + for _, watcher := range l.watchers { + watcher <- watch.Event{ + Type: eventType, + Object: obj.(runtime.Object), + } + } + l.watchersLock.RUnlock() + return nil +} + // addIndexFields saves sortable/filterable fields into tables func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx transaction.Client) error { args := []any{key} diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index 7df40c21..b88bd916 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "testing" + "time" "github.com/rancher/steve/pkg/sqlcache/db" "github.com/rancher/steve/pkg/sqlcache/encryption" @@ -24,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + watch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) @@ -88,9 +90,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(2) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) // create field table @@ -156,9 +158,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(2) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) store.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(fmt.Errorf("error")) @@ -188,9 +190,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(2) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) @@ -228,9 +230,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(2) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) @@ -272,9 +274,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(2) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(2) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) @@ -1816,3 +1818,114 @@ func TestGetField(t *testing.T) { }) } } + +func TestWatchMany(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + loi, err := makeListOptionIndexer(ctx, [][]string{{"metadata", "somefield"}}) + assert.NoError(t, err) + + startWatcher := func(ctx context.Context) (chan watch.Event, chan error) { + errCh := make(chan error, 1) + eventsCh := make(chan watch.Event, 100) + go func() { + watchErr := loi.Watch(ctx, WatchOptions{}, eventsCh) + errCh <- watchErr + }() + time.Sleep(100 * time.Millisecond) + return eventsCh, errCh + } + + waitStopWatcher := func(errCh chan error) error { + select { + case <-time.After(time.Second * 5): + return fmt.Errorf("not finished in time") + case err := <-errCh: + return err + } + } + + receiveEvents := func(eventsCh chan watch.Event) []watch.Event { + timer := time.NewTimer(time.Millisecond * 50) + var events []watch.Event + for { + select { + case <-timer.C: + return events + case ev := <-eventsCh: + events = append(events, ev) + } + } + } + watcher1, errCh1 := startWatcher(ctx) + events := receiveEvents(watcher1) + assert.Len(t, events, 0) + + foo := &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "name": "foo", + }, + }, + } + fooUpdated := foo.DeepCopy() + fooUpdated.SetLabels(map[string]string{ + "hello": "world", + }) + + err = loi.Add(foo) + assert.NoError(t, err) + + events = receiveEvents(watcher1) + assert.Equal(t, []watch.Event{{Type: watch.Added, Object: foo}}, events) + + ctx2, cancel2 := context.WithCancel(context.Background()) + watcher2, errCh2 := startWatcher(ctx2) + + err = loi.Update(fooUpdated) + assert.NoError(t, err) + + events = receiveEvents(watcher1) + assert.Equal(t, []watch.Event{{Type: watch.Modified, Object: fooUpdated}}, events) + + events = receiveEvents(watcher2) + assert.Equal(t, []watch.Event{{Type: watch.Modified, Object: fooUpdated}}, events) + + watcher3, errCh3 := startWatcher(ctx) + + cancel2() + err = waitStopWatcher(errCh2) + assert.NoError(t, err) + + err = loi.Delete(fooUpdated) + assert.NoError(t, err) + err = loi.Add(foo) + assert.NoError(t, err) + err = loi.Update(fooUpdated) + assert.NoError(t, err) + + events = receiveEvents(watcher3) + assert.Equal(t, []watch.Event{ + {Type: watch.Deleted, Object: fooUpdated}, + {Type: watch.Added, Object: foo}, + {Type: watch.Modified, Object: fooUpdated}, + }, events) + + // Verify cancelled watcher don't receive anything anymore + events = receiveEvents(watcher2) + assert.Len(t, events, 0) + + events = receiveEvents(watcher1) + assert.Equal(t, []watch.Event{ + {Type: watch.Deleted, Object: fooUpdated}, + {Type: watch.Added, Object: foo}, + {Type: watch.Modified, Object: fooUpdated}, + }, events) + + cancel() + err = waitStopWatcher(errCh1) + assert.NoError(t, err) + + err = waitStopWatcher(errCh3) + assert.NoError(t, err) +} diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index 528996e8..dbe8c15b 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -9,8 +9,6 @@ import ( "io" "io/ioutil" "net/http" - "os" - "strconv" "strings" "sync" @@ -251,6 +249,8 @@ type Store struct { lock sync.Mutex columnSetter SchemaColumnSetter transformBuilder TransformBuilder + + watchers *Watchers } type CacheFactoryInitializer func() (CacheFactory, error) @@ -268,6 +268,7 @@ func NewProxyStore(ctx context.Context, c SchemaColumnSetter, clientGetter Clien notifier: notifier, columnSetter: c, transformBuilder: virtual.NewTransformBuilder(scache), + watchers: newWatchers(), } if factory == nil { @@ -469,80 +470,6 @@ func returnErr(err error, c chan watch.Event) { } } -func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan watch.Event) { - rev := w.Revision - if rev == "-1" || rev == "0" { - rev = "" - } - - timeout := int64(60 * 30) - timeoutSetting := os.Getenv(watchTimeoutEnv) - if timeoutSetting != "" { - userSetTimeout, err := strconv.Atoi(timeoutSetting) - if err != nil { - logrus.Debugf("could not parse %s environment variable, error: %v", watchTimeoutEnv, err) - } else { - timeout = int64(userSetTimeout) - } - } - k8sClient, _ := metricsStore.Wrap(client, nil) - watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{ - Watch: true, - TimeoutSeconds: &timeout, - ResourceVersion: rev, - LabelSelector: w.Selector, - }) - if err != nil { - returnErr(errors.Wrapf(err, "stopping watch for %s: %v", schema.ID, err), result) - return - } - defer watcher.Stop() - logrus.Debugf("opening watcher for %s", schema.ID) - - eg, ctx := errgroup.WithContext(apiOp.Context()) - - go func() { - <-ctx.Done() - watcher.Stop() - }() - - if s.notifier != nil { - eg.Go(func() error { - for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) { - obj, _, err := s.byID(apiOp, schema, rel.Namespace, rel.Name) - if err == nil { - rowToObject(obj) - result <- watch.Event{Type: watch.Modified, Object: obj} - } else { - returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result) - } - } - return fmt.Errorf("closed") - }) - } - - eg.Go(func() error { - for event := range watcher.ResultChan() { - if event.Type == watch.Error { - if status, ok := event.Object.(*metav1.Status); ok { - returnErr(fmt.Errorf("event watch error: %s", status.Message), result) - } else { - logrus.Debugf("event watch error: could not decode event object %T", event.Object) - } - continue - } - if unstr, ok := event.Object.(*unstructured.Unstructured); ok { - rowToObject(unstr) - } - result <- event - } - return fmt.Errorf("closed") - }) - - _ = eg.Wait() - return -} - // WatchNames returns a channel of events filtered by an allowed set of names. // In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names, // performing the list or watch will result in a Forbidden error, because the user does not have permission @@ -551,7 +478,7 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt // be returned in watch. func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.Set[string]) (chan watch.Event, error) { buffer := &WarningBuffer{} - adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace, buffer) + adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, "", buffer) if err != nil { return nil, err } @@ -592,17 +519,44 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t // Watch returns a channel of events for a list or resource. func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) { buffer := &WarningBuffer{} - client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace, buffer) + client, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, "", buffer) if err != nil { return nil, err } return s.watch(apiOp, schema, w, client) } +type Watchers struct { + watchers map[string]struct{} +} + +func newWatchers() *Watchers { + return &Watchers{ + watchers: make(map[string]struct{}), + } +} + func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan watch.Event, error) { - result := make(chan watch.Event) + // warnings from inside the informer are discarded + gvk := attributes.GVK(schema) + fields := getFieldsFromSchema(schema) + fields = append(fields, getFieldForGVK(gvk)...) + transformFunc := s.transformBuilder.GetTransformFunc(gvk) + tableClient := &tablelistconvert.Client{ResourceInterface: client} + attrs := attributes.GVK(schema) + ns := attributes.Namespaced(schema) + inf, err := s.cacheFactory.CacheFor(s.ctx, fields, transformFunc, tableClient, attrs, ns, controllerschema.IsListWatchable(schema)) + if err != nil { + return nil, err + } + + result := make(chan watch.Event, 1000) go func() { - s.listAndWatch(apiOp, client, schema, w, result) + ctx := apiOp.Context() + err := inf.ByOptionsLister.Watch(ctx, informer.WatchOptions{}, result) + if err != nil { + logrus.Error(err) + } logrus.Debugf("closing watcher for %s", schema.ID) close(result) }() diff --git a/pkg/stores/sqlproxy/proxy_store_test.go b/pkg/stores/sqlproxy/proxy_store_test.go index 85cdb4ea..891f97b4 100644 --- a/pkg/stores/sqlproxy/proxy_store_test.go +++ b/pkg/stores/sqlproxy/proxy_store_test.go @@ -14,9 +14,12 @@ import ( "github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/resources/common" + "github.com/rancher/steve/pkg/sqlcache/db" + "github.com/rancher/steve/pkg/sqlcache/encryption" "github.com/rancher/steve/pkg/sqlcache/informer" "github.com/rancher/steve/pkg/sqlcache/informer/factory" "github.com/rancher/steve/pkg/sqlcache/partition" + "github.com/rancher/steve/pkg/sqlcache/store" "github.com/rancher/steve/pkg/stores/sqlpartition/listprocessor" "github.com/rancher/steve/pkg/stores/sqlproxy/tablelistconvert" "go.uber.org/mock/gomock" @@ -28,17 +31,16 @@ import ( "github.com/rancher/steve/pkg/client" "github.com/rancher/wrangler/v3/pkg/schemas" "github.com/stretchr/testify/assert" - "golang.org/x/sync/errgroup" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" schema2 "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/rest" clientgotesting "k8s.io/client-go/testing" + cache "k8s.io/client-go/tools/cache" ) //go:generate mockgen --build_flags=--mod=mod -package sqlproxy -destination ./proxy_mocks_test.go github.com/rancher/steve/pkg/stores/sqlproxy Cache,ClientGetter,CacheFactory,SchemaColumnSetter,RelationshipNotifier,TransformBuilder @@ -771,42 +773,6 @@ func TestReset(t *testing.T) { } } -func TestWatchNamesErrReceive(t *testing.T) { - testClientFactory, err := client.NewFactory(&rest.Config{}, false) - assert.Nil(t, err) - - fakeClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) - c = watch.NewFakeWithChanSize(5, true) - defer c.Stop() - errMsgsToSend := []string{"err1", "err2", "err3"} - c.Add(&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "testsecret1"}}) - for index := range errMsgsToSend { - c.Error(&metav1.Status{ - Message: errMsgsToSend[index], - }) - } - c.Add(&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "testsecret2"}}) - fakeClient.PrependWatchReactor("*", func(action clientgotesting.Action) (handled bool, ret watch.Interface, err error) { - return true, c, nil - }) - testStore := Store{ - clientGetter: &testFactory{Factory: testClientFactory, - fakeClient: fakeClient, - }, - } - apiSchema := &types.APISchema{Schema: &schemas.Schema{Attributes: map[string]interface{}{"table": "something"}}} - wc, err := testStore.WatchNames(&types.APIRequest{Namespace: "", Schema: apiSchema, Request: &http.Request{}}, apiSchema, types.WatchRequest{}, sets.New[string]("testsecret1", "testsecret2")) - assert.Nil(t, err) - - eg := errgroup.Group{} - eg.Go(func() error { return receiveUntil(wc, 5*time.Second) }) - - err = eg.Wait() - assert.Nil(t, err) - - assert.Equal(t, 0, len(c.ResultChan()), "Expected all secrets to have been received") -} - func (t *testFactory) TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string, warningHandler rest.WarningHandler) (dynamic.ResourceInterface, error) { return t.fakeClient.Resource(schema2.GroupVersionResource{}), nil } @@ -1556,3 +1522,35 @@ func TestUpdate(t *testing.T) { }) } } + +func makeListOptionIndexer(ctx context.Context, fields [][]string) (*informer.ListOptionIndexer, error) { + gvk := schema2.GroupVersionKind{ + Group: "", + Version: "", + Kind: "", + } + example := &unstructured.Unstructured{} + example.SetGroupVersionKind(gvk) + name := "theName" + m, err := encryption.NewManager() + if err != nil { + return nil, err + } + + db, err := db.NewClient(nil, m, m) + if err != nil { + return nil, err + } + + s, err := store.NewStore(ctx, example, cache.DeletionHandlingMetaNamespaceKeyFunc, db, false, name) + if err != nil { + return nil, err + } + + listOptionIndexer, err := informer.NewListOptionIndexer(ctx, fields, s, true) + if err != nil { + return nil, err + } + + return listOptionIndexer, nil +} diff --git a/pkg/stores/sqlproxy/sql_informer_mocks_test.go b/pkg/stores/sqlproxy/sql_informer_mocks_test.go index ca237f8c..3588449a 100644 --- a/pkg/stores/sqlproxy/sql_informer_mocks_test.go +++ b/pkg/stores/sqlproxy/sql_informer_mocks_test.go @@ -13,10 +13,12 @@ import ( context "context" reflect "reflect" + informer "github.com/rancher/steve/pkg/sqlcache/informer" partition "github.com/rancher/steve/pkg/sqlcache/partition" sqltypes "github.com/rancher/steve/pkg/sqlcache/sqltypes" gomock "go.uber.org/mock/gomock" unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + watch "k8s.io/apimachinery/pkg/watch" ) // MockByOptionsLister is a mock of ByOptionsLister interface. @@ -58,3 +60,17 @@ func (mr *MockByOptionsListerMockRecorder) ListByOptions(arg0, arg1, arg2, arg3 mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByOptions", reflect.TypeOf((*MockByOptionsLister)(nil).ListByOptions), arg0, arg1, arg2, arg3) } + +// Watch mocks base method. +func (m *MockByOptionsLister) Watch(arg0 context.Context, arg1 informer.WatchOptions, arg2 chan<- watch.Event) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Watch", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// Watch indicates an expected call of Watch. +func (mr *MockByOptionsListerMockRecorder) Watch(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockByOptionsLister)(nil).Watch), arg0, arg1, arg2) +}