From 4bd760b22df73832ccd558b445ccbf4ec36e7b03 Mon Sep 17 00:00:00 2001 From: Tom Lebreux Date: Fri, 5 Sep 2025 11:30:56 -0400 Subject: [PATCH] Fix race condition with database reset (#804) * Wait for garbage collection to be finished before creating a new DB * Prevent DB reset while inflight List requests * Rename cacheFor to cacheForLocked * Rename Reset to Stop * Mention DoneWithCache in README * Add RunGC to ByOptionsLister interface --- pkg/sqlcache/Readme.md | 4 + .../informer/factory/informer_factory.go | 36 ++++-- .../informer/factory/informer_factory_test.go | 53 ++++++--- .../factory/sql_informer_mocks_test.go | 103 ++++++++++++++++++ pkg/sqlcache/informer/informer.go | 18 +++ pkg/sqlcache/informer/informer_mocks_test.go | 12 ++ pkg/sqlcache/informer/listoption_indexer.go | 19 +++- .../informer/listoption_indexer_test.go | 2 + pkg/sqlcache/integration_test.go | 7 +- pkg/stores/sqlproxy/proxy_mocks_test.go | 28 +++-- pkg/stores/sqlproxy/proxy_store.go | 17 ++- pkg/stores/sqlproxy/proxy_store_test.go | 79 ++++++++------ .../sqlproxy/sql_informer_mocks_test.go | 12 ++ 13 files changed, 311 insertions(+), 79 deletions(-) create mode 100644 pkg/sqlcache/informer/factory/sql_informer_mocks_test.go diff --git a/pkg/sqlcache/Readme.md b/pkg/sqlcache/Readme.md index e971c298..5b9df0d7 100644 --- a/pkg/sqlcache/Readme.md +++ b/pkg/sqlcache/Readme.md @@ -82,6 +82,10 @@ intended to be used as a way of enforcing RBAC. if err != nil { panic(err) } + // Don't forget to let the cache factory know you're done with + // the cache. This unlocks a global lock used to prevent + // Stop()ing the cache when requests are in flight. + defer cacheFactory.DoneWithCache(c) // continueToken will just be an offset that can be used in Resume on a subsequent request to continue // to next page diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index 5661c8a9..97cdd065 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -108,11 +108,20 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) { // CacheFor returns an informer for given GVK, using sql store indexed with fields, using the specified client. For virtual fields, they must be added by the transform function // and specified by fields to be used for later fields. -func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (Cache, error) { +// +// Don't forget to call DoneWithCache with the given informer once done with it. +func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) { // First of all block Reset() until we are done f.mutex.RLock() - defer f.mutex.RUnlock() + cache, err := f.cacheForLocked(ctx, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, namespaced, watchable) + if err != nil { + f.mutex.RUnlock() + return nil, err + } + return cache, nil +} +func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) { // Second, check if the informer and its accompanying informer-specific mutex exist already in the informers cache // If not, start by creating such informer-specific mutex. That is used later to ensure no two goroutines create // informers for the same GVK at the same type @@ -146,7 +155,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external shouldEncrypt := f.encryptAll || encryptResourceAlways i, err := f.newInformer(f.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, f.gcInterval, f.gcKeepCount) if err != nil { - return Cache{}, err + return nil, err } err = i.SetWatchErrorHandler(func(r *cache.Reflector, err error) { @@ -157,7 +166,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external cache.DefaultWatchErrorHandler(ctx, r, err) }) if err != nil { - return Cache{}, err + return nil, err } f.wg.StartWithChannel(f.ctx.Done(), i.Run) @@ -166,16 +175,25 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external } if !cache.WaitForCacheSync(f.ctx.Done(), gi.informer.HasSynced) { - return Cache{}, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk) + return nil, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk) } // At this point the informer is ready, return it - return Cache{ByOptionsLister: gi.informer}, nil + return &Cache{ByOptionsLister: gi.informer}, nil } -// Reset cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets +// DoneWithCache must be called for every CacheFor call. +// +// This ensures that there aren't any inflight list requests while we are resetting the database. +// +// TODO: Use the *Cache once we go per-GVK +func (f *CacheFactory) DoneWithCache(_ *Cache) { + f.mutex.RUnlock() +} + +// Stop cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets // the database connection which wipes any current sqlite database at the default location. -func (f *CacheFactory) Reset() error { +func (f *CacheFactory) Stop() error { if f.dbClient == nil { // nothing to reset return nil @@ -187,8 +205,8 @@ func (f *CacheFactory) Reset() error { // now that we are alone, stop all informers created until this point f.cancel() - f.ctx, f.cancel = context.WithCancel(context.Background()) f.wg.Wait() + f.ctx, f.cancel = context.WithCancel(context.Background()) // and get rid of all references to those informers and their mutexes f.informersMutex.Lock() diff --git a/pkg/sqlcache/informer/factory/informer_factory_test.go b/pkg/sqlcache/informer/factory/informer_factory_test.go index f1d09c70..8cf471a1 100644 --- a/pkg/sqlcache/informer/factory/informer_factory_test.go +++ b/pkg/sqlcache/informer/factory/informer_factory_test.go @@ -21,6 +21,7 @@ import ( //go:generate go run go.uber.org/mock/mockgen@latest --build_flags=--mod=mod -package factory -destination ./transaction_mocks_tests.go -mock_names Client=MockTXClient github.com/rancher/steve/pkg/sqlcache/db/transaction Client //go:generate go run go.uber.org/mock/mockgen@latest --build_flags=--mod=mod -package factory -destination ./dynamic_mocks_test.go k8s.io/client-go/dynamic ResourceInterface //go:generate go run go.uber.org/mock/mockgen@latest --build_flags=--mod=mod -package factory -destination ./k8s_cache_mocks_test.go k8s.io/client-go/tools/cache SharedIndexInformer +//go:generate go run go.uber.org/mock/mockgen@latest --build_flags=--mod=mod -package factory -destination ./sql_informer_mocks_test.go github.com/rancher/steve/pkg/sqlcache/informer ByOptionsLister func TestNewCacheFactory(t *testing.T) { type testCase struct { @@ -64,6 +65,8 @@ func TestCacheFor(t *testing.T) { dynamicClient := NewMockResourceInterface(gomock.NewController(t)) fields := [][]string{{"something"}} expectedGVK := schema.GroupVersionKind{} + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true).AnyTimes() sii.EXPECT().Run(gomock.Any()).MinTimes(1) @@ -71,8 +74,9 @@ func TestCacheFor(t *testing.T) { i := &informer.Informer{ // need to set this so Run function is not nil SharedIndexInformer: sii, + ByOptionsLister: bloi, } - expectedC := Cache{ + expectedC := &Cache{ ByOptionsLister: i, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { @@ -97,7 +101,7 @@ func TestCacheFor(t *testing.T) { time.Sleep(5 * time.Second) f.cancel() }() - var c Cache + var c *Cache var err error c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -114,6 +118,8 @@ func TestCacheFor(t *testing.T) { fields := [][]string{{"something"}} expectedGVK := schema.GroupVersionKind{} + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(false).AnyTimes() sii.EXPECT().Run(gomock.Any()) @@ -121,6 +127,7 @@ func TestCacheFor(t *testing.T) { expectedI := &informer.Informer{ // need to set this so Run function is not nil SharedIndexInformer: sii, + ByOptionsLister: bloi, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) @@ -154,6 +161,8 @@ func TestCacheFor(t *testing.T) { fields := [][]string{{"something"}} expectedGVK := schema.GroupVersionKind{} + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true).AnyTimes() // may or may not call run initially @@ -162,8 +171,9 @@ func TestCacheFor(t *testing.T) { i := &informer.Informer{ // need to set this so Run function is not nil SharedIndexInformer: sii, + ByOptionsLister: bloi, } - expectedC := Cache{ + expectedC := &Cache{ ByOptionsLister: i, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { @@ -184,7 +194,7 @@ func TestCacheFor(t *testing.T) { f.ctx, f.cancel = context.WithCancel(context.Background()) f.cancel() - var c Cache + var c *Cache var err error c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -196,6 +206,8 @@ func TestCacheFor(t *testing.T) { dynamicClient := NewMockResourceInterface(gomock.NewController(t)) fields := [][]string{{"something"}} expectedGVK := schema.GroupVersionKind{} + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() @@ -203,8 +215,9 @@ func TestCacheFor(t *testing.T) { i := &informer.Informer{ // need to set this so Run function is not nil SharedIndexInformer: sii, + ByOptionsLister: bloi, } - expectedC := Cache{ + expectedC := &Cache{ ByOptionsLister: i, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { @@ -229,7 +242,7 @@ func TestCacheFor(t *testing.T) { time.Sleep(10 * time.Second) f.cancel() }() - var c Cache + var c *Cache var err error c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -246,6 +259,8 @@ func TestCacheFor(t *testing.T) { Version: "v1", Kind: "Secret", } + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() @@ -253,8 +268,9 @@ func TestCacheFor(t *testing.T) { i := &informer.Informer{ // need to set this so Run function is not nil SharedIndexInformer: sii, + ByOptionsLister: bloi, } - expectedC := Cache{ + expectedC := &Cache{ ByOptionsLister: i, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { @@ -279,7 +295,7 @@ func TestCacheFor(t *testing.T) { time.Sleep(10 * time.Second) f.cancel() }() - var c Cache + var c *Cache var err error c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -295,6 +311,8 @@ func TestCacheFor(t *testing.T) { Version: "v3", Kind: "Token", } + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() @@ -302,8 +320,9 @@ func TestCacheFor(t *testing.T) { i := &informer.Informer{ // need to set this so Run function is not nil SharedIndexInformer: sii, + ByOptionsLister: bloi, } - expectedC := Cache{ + expectedC := &Cache{ ByOptionsLister: i, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { @@ -328,7 +347,7 @@ func TestCacheFor(t *testing.T) { time.Sleep(10 * time.Second) f.cancel() }() - var c Cache + var c *Cache var err error c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -341,6 +360,8 @@ func TestCacheFor(t *testing.T) { dynamicClient := NewMockResourceInterface(gomock.NewController(t)) fields := [][]string{{"something"}} expectedGVK := schema.GroupVersionKind{} + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1) @@ -351,8 +372,9 @@ func TestCacheFor(t *testing.T) { i := &informer.Informer{ // need to set this so Run function is not nil SharedIndexInformer: sii, + ByOptionsLister: bloi, } - expectedC := Cache{ + expectedC := &Cache{ ByOptionsLister: i, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { @@ -385,7 +407,7 @@ func TestCacheFor(t *testing.T) { time.Sleep(5 * time.Second) f.cancel() }() - var c Cache + var c *Cache var err error c, err = f.CacheFor(context.Background(), fields, nil, nil, transformFunc, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -397,6 +419,8 @@ func TestCacheFor(t *testing.T) { dynamicClient := NewMockResourceInterface(gomock.NewController(t)) fields := [][]string{{"something"}} expectedGVK := schema.GroupVersionKind{} + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() @@ -404,8 +428,9 @@ func TestCacheFor(t *testing.T) { i := &informer.Informer{ // need to set this so Run function is not nil SharedIndexInformer: sii, + ByOptionsLister: bloi, } - expectedC := Cache{ + expectedC := &Cache{ ByOptionsLister: i, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { @@ -433,7 +458,7 @@ func TestCacheFor(t *testing.T) { time.Sleep(10 * time.Second) f.cancel() }() - var c Cache + var c *Cache var err error // CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) diff --git a/pkg/sqlcache/informer/factory/sql_informer_mocks_test.go b/pkg/sqlcache/informer/factory/sql_informer_mocks_test.go new file mode 100644 index 00000000..421dd62f --- /dev/null +++ b/pkg/sqlcache/informer/factory/sql_informer_mocks_test.go @@ -0,0 +1,103 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/rancher/steve/pkg/sqlcache/informer (interfaces: ByOptionsLister) +// +// Generated by this command: +// +// mockgen --build_flags=--mod=mod -package factory -destination ./sql_informer_mocks_test.go github.com/rancher/steve/pkg/sqlcache/informer ByOptionsLister +// + +// Package factory is a generated GoMock package. +package factory + +import ( + context "context" + reflect "reflect" + + informer "github.com/rancher/steve/pkg/sqlcache/informer" + partition "github.com/rancher/steve/pkg/sqlcache/partition" + sqltypes "github.com/rancher/steve/pkg/sqlcache/sqltypes" + gomock "go.uber.org/mock/gomock" + unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + watch "k8s.io/apimachinery/pkg/watch" +) + +// MockByOptionsLister is a mock of ByOptionsLister interface. +type MockByOptionsLister struct { + ctrl *gomock.Controller + recorder *MockByOptionsListerMockRecorder + isgomock struct{} +} + +// MockByOptionsListerMockRecorder is the mock recorder for MockByOptionsLister. +type MockByOptionsListerMockRecorder struct { + mock *MockByOptionsLister +} + +// NewMockByOptionsLister creates a new mock instance. +func NewMockByOptionsLister(ctrl *gomock.Controller) *MockByOptionsLister { + mock := &MockByOptionsLister{ctrl: ctrl} + mock.recorder = &MockByOptionsListerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockByOptionsLister) EXPECT() *MockByOptionsListerMockRecorder { + return m.recorder +} + +// GetLatestResourceVersion mocks base method. +func (m *MockByOptionsLister) GetLatestResourceVersion() []string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLatestResourceVersion") + ret0, _ := ret[0].([]string) + return ret0 +} + +// GetLatestResourceVersion indicates an expected call of GetLatestResourceVersion. +func (mr *MockByOptionsListerMockRecorder) GetLatestResourceVersion() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestResourceVersion", reflect.TypeOf((*MockByOptionsLister)(nil).GetLatestResourceVersion)) +} + +// ListByOptions mocks base method. +func (m *MockByOptionsLister) ListByOptions(ctx context.Context, lo *sqltypes.ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListByOptions", ctx, lo, partitions, namespace) + ret0, _ := ret[0].(*unstructured.UnstructuredList) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(string) + ret3, _ := ret[3].(error) + return ret0, ret1, ret2, ret3 +} + +// ListByOptions indicates an expected call of ListByOptions. +func (mr *MockByOptionsListerMockRecorder) ListByOptions(ctx, lo, partitions, namespace any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByOptions", reflect.TypeOf((*MockByOptionsLister)(nil).ListByOptions), ctx, lo, partitions, namespace) +} + +// RunGC mocks base method. +func (m *MockByOptionsLister) RunGC(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RunGC", arg0) +} + +// RunGC indicates an expected call of RunGC. +func (mr *MockByOptionsListerMockRecorder) RunGC(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunGC", reflect.TypeOf((*MockByOptionsLister)(nil).RunGC), arg0) +} + +// Watch mocks base method. +func (m *MockByOptionsLister) Watch(ctx context.Context, options informer.WatchOptions, eventsCh chan<- watch.Event) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Watch", ctx, options, eventsCh) + ret0, _ := ret[0].(error) + return ret0 +} + +// Watch indicates an expected call of Watch. +func (mr *MockByOptionsListerMockRecorder) Watch(ctx, options, eventsCh any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockByOptionsLister)(nil).Watch), ctx, options, eventsCh) +} diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index 6556578f..9b46bf1e 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" @@ -49,6 +50,7 @@ type ByOptionsLister interface { ListByOptions(ctx context.Context, lo *sqltypes.ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) Watch(ctx context.Context, options WatchOptions, eventsCh chan<- watch.Event) error GetLatestResourceVersion() []string + RunGC(context.Context) } // this is set to a var so that it can be overridden by test code for mocking purposes @@ -138,6 +140,22 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [ }, nil } +// Run implements [cache.SharedIndexInformer] +func (i *Informer) Run(stopCh <-chan struct{}) { + var wg wait.Group + wg.StartWithChannel(stopCh, i.SharedIndexInformer.Run) + wg.StartWithContext(wait.ContextForChannel(stopCh), i.ByOptionsLister.RunGC) + wg.Wait() +} + +// RunWithContext implements [cache.SharedIndexInformer] +func (i *Informer) RunWithContext(ctx context.Context) { + var wg wait.Group + wg.StartWithContext(ctx, i.SharedIndexInformer.RunWithContext) + wg.StartWithContext(ctx, i.ByOptionsLister.RunGC) + wg.Wait() +} + // ListByOptions returns objects according to the specified list options and partitions. // Specifically: // - an unstructured list of resources belonging to any of the specified partitions diff --git a/pkg/sqlcache/informer/informer_mocks_test.go b/pkg/sqlcache/informer/informer_mocks_test.go index dc3b2ea8..6bc72099 100644 --- a/pkg/sqlcache/informer/informer_mocks_test.go +++ b/pkg/sqlcache/informer/informer_mocks_test.go @@ -75,6 +75,18 @@ func (mr *MockByOptionsListerMockRecorder) ListByOptions(ctx, lo, partitions, na return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByOptions", reflect.TypeOf((*MockByOptionsLister)(nil).ListByOptions), ctx, lo, partitions, namespace) } +// RunGC mocks base method. +func (m *MockByOptionsLister) RunGC(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RunGC", arg0) +} + +// RunGC indicates an expected call of RunGC. +func (mr *MockByOptionsListerMockRecorder) RunGC(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunGC", reflect.TypeOf((*MockByOptionsLister)(nil).RunGC), arg0) +} + // Watch mocks base method. func (m *MockByOptionsLister) Watch(ctx context.Context, options WatchOptions, eventsCh chan<- watch.Event) error { m.ctrl.T.Helper() diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index f897aded..b667af0e 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -44,6 +44,11 @@ type ListOptionIndexer struct { watchersLock sync.RWMutex watchers map[*watchKey]*watcher + // gcInterval is how often to run the garbage collection + gcInterval time.Duration + // gcKeepCount is how many events to keep in _events table when gc runs + gcKeepCount int + upsertEventsQuery string findEventsRowByRVQuery string listEventsAfterQuery string @@ -282,7 +287,8 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp l.deleteLabelsByKeyStmt = l.Prepare(l.deleteLabelsByKeyQuery) l.deleteLabelsStmt = l.Prepare(l.deleteLabelsQuery) - go l.runGC(ctx, opts.GCInterval, opts.GCKeepCount) + l.gcInterval = opts.GCInterval + l.gcKeepCount = opts.GCKeepCount return l, nil } @@ -1555,21 +1561,22 @@ func matchFilter(filterName string, filterNamespace string, filterSelector label return true } -func (l *ListOptionIndexer) runGC(ctx context.Context, interval time.Duration, keepCount int) { - if interval == 0 || keepCount == 0 { +func (l *ListOptionIndexer) RunGC(ctx context.Context) { + if l.gcInterval == 0 || l.gcKeepCount == 0 { return } - ticker := time.NewTicker(interval) + ticker := time.NewTicker(l.gcInterval) defer ticker.Stop() - logrus.Infof("Started SQL cache garbage collection for %s (interval=%s, keep=%d)", l.GetName(), interval, keepCount) + logrus.Infof("Started SQL cache garbage collection for %s (interval=%s, keep=%d)", l.GetName(), l.gcInterval, l.gcKeepCount) + defer logrus.Infof("Stopped SQL cache garbage collection for %s (interval=%s, keep=%d)", l.GetName(), l.gcInterval, l.gcKeepCount) for { select { case <-ticker.C: err := l.WithTransaction(ctx, true, func(tx transaction.Client) error { - _, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(keepCount) + _, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(l.gcKeepCount) if err != nil { return &db.QueryError{QueryString: l.deleteEventsByCountQuery, Err: err} } diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index d14bf0ef..9410a8cf 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -100,6 +100,8 @@ func makeListOptionIndexer(ctx context.Context, opts ListOptionIndexerOptions, s return nil, "", err } + go listOptionIndexer.RunGC(ctx) + return listOptionIndexer, dbPath, nil } diff --git a/pkg/sqlcache/integration_test.go b/pkg/sqlcache/integration_test.go index 0eaca071..33b91fba 100644 --- a/pkg/sqlcache/integration_test.go +++ b/pkg/sqlcache/integration_test.go @@ -95,7 +95,10 @@ func (i *IntegrationSuite) TestSQLCacheFilters() { cache, cacheFactory, err := i.createCacheAndFactory(fields, nil) require.NoError(err) - defer cacheFactory.Reset() + defer func() { + cacheFactory.DoneWithCache(cache) + cacheFactory.Stop() + }() // doesn't match the filter for somekey == somevalue notMatches := configMapWithAnnotations("not-matches-filter", map[string]string{"somekey": "notequal"}) @@ -327,7 +330,7 @@ func (i *IntegrationSuite) createCacheAndFactory(fields [][]string, transformFun if err != nil { return nil, nil, fmt.Errorf("unable to make cache: %w", err) } - return &cache, cacheFactory, nil + return cache, cacheFactory, nil } func (i *IntegrationSuite) waitForCacheReady(readyResourceNames []string, namespace string, cache *factory.Cache) error { diff --git a/pkg/stores/sqlproxy/proxy_mocks_test.go b/pkg/stores/sqlproxy/proxy_mocks_test.go index d954ede8..7c8ce5b9 100644 --- a/pkg/stores/sqlproxy/proxy_mocks_test.go +++ b/pkg/stores/sqlproxy/proxy_mocks_test.go @@ -267,10 +267,10 @@ func (m *MockCacheFactory) EXPECT() *MockCacheFactoryMockRecorder { } // CacheFor mocks base method. -func (m *MockCacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced, watchable bool) (factory.Cache, error) { +func (m *MockCacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced, watchable bool) (*factory.Cache, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CacheFor", ctx, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, namespaced, watchable) - ret0, _ := ret[0].(factory.Cache) + ret0, _ := ret[0].(*factory.Cache) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -281,18 +281,30 @@ func (mr *MockCacheFactoryMockRecorder) CacheFor(ctx, fields, externalUpdateInfo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CacheFor", reflect.TypeOf((*MockCacheFactory)(nil).CacheFor), ctx, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, namespaced, watchable) } -// Reset mocks base method. -func (m *MockCacheFactory) Reset() error { +// DoneWithCache mocks base method. +func (m *MockCacheFactory) DoneWithCache(arg0 *factory.Cache) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Reset") + m.ctrl.Call(m, "DoneWithCache", arg0) +} + +// DoneWithCache indicates an expected call of DoneWithCache. +func (mr *MockCacheFactoryMockRecorder) DoneWithCache(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoneWithCache", reflect.TypeOf((*MockCacheFactory)(nil).DoneWithCache), arg0) +} + +// Stop mocks base method. +func (m *MockCacheFactory) Stop() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stop") ret0, _ := ret[0].(error) return ret0 } -// Reset indicates an expected call of Reset. -func (mr *MockCacheFactoryMockRecorder) Reset() *gomock.Call { +// Stop indicates an expected call of Stop. +func (mr *MockCacheFactoryMockRecorder) Stop() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockCacheFactory)(nil).Reset)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockCacheFactory)(nil).Stop)) } // MockSchemaColumnSetter is a mock of SchemaColumnSetter interface. diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index db50a0e6..7160c4b1 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -289,7 +289,7 @@ type Store struct { notifier RelationshipNotifier cacheFactory CacheFactory cfInitializer CacheFactoryInitializer - namespaceCache Cache + namespaceCache *factory.Cache lock sync.Mutex columnSetter SchemaColumnSetter transformBuilder TransformBuilder @@ -300,8 +300,9 @@ type Store struct { type CacheFactoryInitializer func() (CacheFactory, error) type CacheFactory interface { - CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (factory.Cache, error) - Reset() error + CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*factory.Cache, error) + DoneWithCache(*factory.Cache) + Stop() error } // NewProxyStore returns a Store implemented directly on top of kubernetes. @@ -336,7 +337,10 @@ func NewProxyStore(ctx context.Context, c SchemaColumnSetter, clientGetter Clien func (s *Store) Reset() error { s.lock.Lock() defer s.lock.Unlock() - if err := s.cacheFactory.Reset(); err != nil { + if s.namespaceCache != nil { + s.cacheFactory.DoneWithCache(s.namespaceCache) + } + if err := s.cacheFactory.Stop(); err != nil { return fmt.Errorf("reset: %w", err) } @@ -595,6 +599,10 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types. if err != nil { return nil, err } + // FIXME: This needs to be called when the watch ends, not at the end of + // this func. However, there's currently a bug where we don't correctly + // shutdown watches, so for now, we do this. + defer s.cacheFactory.DoneWithCache(inf) var selector labels.Selector if w.Selector != "" { @@ -803,6 +811,7 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, apiSchema *types.APISc if err != nil { return nil, 0, "", fmt.Errorf("cachefor %v: %w", gvk, err) } + defer s.cacheFactory.DoneWithCache(inf) opts, err := listprocessor.ParseQuery(apiOp, gvk.Kind) if err != nil { diff --git a/pkg/stores/sqlproxy/proxy_store_test.go b/pkg/stores/sqlproxy/proxy_store_test.go index 3a109ca5..95540aa9 100644 --- a/pkg/stores/sqlproxy/proxy_store_test.go +++ b/pkg/stores/sqlproxy/proxy_store_test.go @@ -75,7 +75,7 @@ func TestNewProxyStore(t *testing.T) { cf := NewMockCacheFactory(gomock.NewController(t)) ri := NewMockResourceInterface(gomock.NewController(t)) bloi := NewMockByOptionsLister(gomock.NewController(t)) - c := factory.Cache{ + c := &factory.Cache{ ByOptionsLister: &informer.Informer{ ByOptionsLister: bloi, }, @@ -167,7 +167,7 @@ func TestNewProxyStore(t *testing.T) { nsSchema := baseNSSchema scc.EXPECT().SetColumns(context.Background(), &nsSchema).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) - cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(factory.Cache{}, fmt.Errorf("error")) + cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nil, fmt.Errorf("error")) s, err := NewProxyStore(context.Background(), scc, cg, rn, nil, cf, true) assert.Nil(t, err) @@ -194,7 +194,6 @@ func TestListByPartitions(t *testing.T) { description: "client ListByPartitions() with no errors returned should return no errors. Should pass fields" + " from schema.", test: func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) ri := NewMockResourceInterface(gomock.NewController(t)) @@ -203,12 +202,12 @@ func TestListByPartitions(t *testing.T) { inf := &informer.Informer{ ByOptionsLister: bloi, } - c := factory.Cache{ + c := &factory.Cache{ ByOptionsLister: inf, } s := &Store{ ctx: context.Background(), - namespaceCache: nsi, + namespaceCache: &factory.Cache{ByOptionsLister: bloi}, clientGetter: cg, cacheFactory: cf, transformBuilder: tb, @@ -261,6 +260,7 @@ func TestListByPartitions(t *testing.T) { cg.EXPECT().TableAdminClient(req, schema, "", &WarningBuffer{}).Return(ri, nil) // This tests that fields are being extracted from schema columns and the type specific fields map cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(c, nil) + cf.EXPECT().DoneWithCache(c) tb.EXPECT().GetTransformFunc(attributes.GVK(schema), []common.ColumnDefinition{{Field: "some.field"}}, false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) bloi.EXPECT().ListByOptions(req.Context(), &opts, partitions, req.Namespace).Return(listToReturn, len(listToReturn.Items), "", nil) list, total, contToken, err := s.ListByPartitions(req, schema, partitions) @@ -274,7 +274,9 @@ func TestListByPartitions(t *testing.T) { description: "client ListByPartitions() with no errors returned should return no errors. Should pass fields" + " from schema.", test: func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) + nsi := &factory.Cache{ + ByOptionsLister: NewMockByOptionsLister(gomock.NewController(t)), + } cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) tb := NewMockTransformBuilder(gomock.NewController(t)) @@ -340,21 +342,23 @@ func TestListByPartitions(t *testing.T) { tests = append(tests, testCase{ description: "client ListByPartitions() should detect listable-but-unwatchable schema, still work normally", test: func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) ri := NewMockResourceInterface(gomock.NewController(t)) bloi := NewMockByOptionsLister(gomock.NewController(t)) + nsi := factory.Cache{ + ByOptionsLister: bloi, + } tb := NewMockTransformBuilder(gomock.NewController(t)) inf := &informer.Informer{ ByOptionsLister: bloi, } - c := factory.Cache{ + c := &factory.Cache{ ByOptionsLister: inf, } s := &Store{ ctx: context.Background(), - namespaceCache: nsi, + namespaceCache: &nsi, clientGetter: cg, cacheFactory: cf, transformBuilder: tb, @@ -410,6 +414,7 @@ func TestListByPartitions(t *testing.T) { // This tests that fields are being extracted from schema columns and the type specific fields map // note also the watchable bool is expected to be false cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), false).Return(c, nil) + cf.EXPECT().DoneWithCache(c) tb.EXPECT().GetTransformFunc(attributes.GVK(schema), []common.ColumnDefinition{{Field: "some.field"}}, false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) bloi.EXPECT().ListByOptions(req.Context(), &opts, partitions, req.Namespace).Return(listToReturn, len(listToReturn.Items), "", nil) @@ -423,7 +428,9 @@ func TestListByPartitions(t *testing.T) { tests = append(tests, testCase{ description: "client ListByPartitions() with CacheFor() error returned should return an errors. Should pass fields", test: func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) + nsi := factory.Cache{ + ByOptionsLister: NewMockByOptionsLister(gomock.NewController(t)), + } cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) ri := NewMockResourceInterface(gomock.NewController(t)) @@ -431,7 +438,7 @@ func TestListByPartitions(t *testing.T) { s := &Store{ ctx: context.Background(), - namespaceCache: nsi, + namespaceCache: &nsi, clientGetter: cg, cacheFactory: cf, transformBuilder: tb, @@ -484,7 +491,7 @@ func TestListByPartitions(t *testing.T) { cg.EXPECT().TableAdminClient(req, schema, "", &WarningBuffer{}).Return(ri, nil) // This tests that fields are being extracted from schema columns and the type specific fields map tb.EXPECT().GetTransformFunc(attributes.GVK(schema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) - cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(factory.Cache{}, fmt.Errorf("error")) + cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(nil, fmt.Errorf("error")) _, _, _, err = s.ListByPartitions(req, schema, partitions) assert.NotNil(t, err) @@ -494,7 +501,9 @@ func TestListByPartitions(t *testing.T) { description: "client ListByPartitions() with ListByOptions() error returned should return an errors. Should pass fields" + " from schema.", test: func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) + nsi := factory.Cache{ + ByOptionsLister: NewMockByOptionsLister(gomock.NewController(t)), + } cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) ri := NewMockResourceInterface(gomock.NewController(t)) @@ -503,12 +512,12 @@ func TestListByPartitions(t *testing.T) { inf := &informer.Informer{ ByOptionsLister: bloi, } - c := factory.Cache{ + c := &factory.Cache{ ByOptionsLister: inf, } s := &Store{ ctx: context.Background(), - namespaceCache: nsi, + namespaceCache: &nsi, clientGetter: cg, cacheFactory: cf, transformBuilder: tb, @@ -561,6 +570,7 @@ func TestListByPartitions(t *testing.T) { cg.EXPECT().TableAdminClient(req, schema, "", &WarningBuffer{}).Return(ri, nil) // This tests that fields are being extracted from schema columns and the type specific fields map cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {`id`}, {`metadata`, `state`, `name`}, {"gvk", "specific", "fields"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(schema), attributes.Namespaced(schema), true).Return(c, nil) + cf.EXPECT().DoneWithCache(c) bloi.EXPECT().ListByOptions(req.Context(), &opts, partitions, req.Namespace).Return(nil, 0, "", fmt.Errorf("error")) tb.EXPECT().GetTransformFunc(attributes.GVK(schema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) @@ -610,21 +620,23 @@ func TestListByPartitionWithUserAccess(t *testing.T) { t.Parallel() for _, test := range tests { t.Run(test.description, func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) ri := NewMockResourceInterface(gomock.NewController(t)) bloi := NewMockByOptionsLister(gomock.NewController(t)) + nsi := factory.Cache{ + ByOptionsLister: bloi, + } tb := NewMockTransformBuilder(gomock.NewController(t)) inf := &informer.Informer{ ByOptionsLister: bloi, } - c := factory.Cache{ + c := &factory.Cache{ ByOptionsLister: inf, } s := &Store{ ctx: context.Background(), - namespaceCache: nsi, + namespaceCache: &nsi, clientGetter: cg, cacheFactory: cf, transformBuilder: tb, @@ -674,6 +686,7 @@ func TestListByPartitionWithUserAccess(t *testing.T) { attributes.SetGVK(theSchema, gvk) cg.EXPECT().TableAdminClient(apiOp, theSchema, "", &WarningBuffer{}).Return(ri, nil) cf.EXPECT().CacheFor(context.Background(), [][]string{{"some", "field"}, {"id"}, {"metadata", "state", "name"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(theSchema), attributes.Namespaced(theSchema), true).Return(c, nil) + cf.EXPECT().DoneWithCache(c) tb.EXPECT().GetTransformFunc(attributes.GVK(theSchema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) listToReturn := &unstructured.UnstructuredList{ @@ -695,13 +708,14 @@ func TestReset(t *testing.T) { tests = append(tests, testCase{ description: "client Reset() with no errors returned should return no errors.", test: func(t *testing.T) { - nsc := NewMockCache(gomock.NewController(t)) + nsc := &factory.Cache{ + ByOptionsLister: NewMockByOptionsLister(gomock.NewController(t)), + } cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) cs := NewMockSchemaColumnSetter(gomock.NewController(t)) ri := NewMockResourceInterface(gomock.NewController(t)) tb := NewMockTransformBuilder(gomock.NewController(t)) - nsc2 := factory.Cache{} s := &Store{ ctx: context.Background(), namespaceCache: nsc, @@ -712,20 +726,20 @@ func TestReset(t *testing.T) { transformBuilder: tb, } nsSchema := baseNSSchema - cf.EXPECT().Reset().Return(nil) + cf.EXPECT().Stop().Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) - cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nsc2, nil) + cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nsc, nil) + cf.EXPECT().DoneWithCache(nsc) tb.EXPECT().GetTransformFunc(attributes.GVK(&nsSchema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) err := s.Reset() assert.Nil(t, err) - assert.Equal(t, nsc2, s.namespaceCache) + assert.Equal(t, nsc, s.namespaceCache) }, }) tests = append(tests, testCase{ description: "client Reset() with cache factory Reset() error returned, should return an error.", test: func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) cs := NewMockSchemaColumnSetter(gomock.NewController(t)) @@ -733,7 +747,6 @@ func TestReset(t *testing.T) { s := &Store{ ctx: context.Background(), - namespaceCache: nsi, clientGetter: cg, cacheFactory: cf, columnSetter: cs, @@ -741,7 +754,7 @@ func TestReset(t *testing.T) { transformBuilder: tb, } - cf.EXPECT().Reset().Return(fmt.Errorf("error")) + cf.EXPECT().Stop().Return(fmt.Errorf("error")) err := s.Reset() assert.NotNil(t, err) }, @@ -749,7 +762,6 @@ func TestReset(t *testing.T) { tests = append(tests, testCase{ description: "client Reset() with column setter error returned, should return an error.", test: func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) cs := NewMockSchemaColumnSetter(gomock.NewController(t)) @@ -757,7 +769,6 @@ func TestReset(t *testing.T) { s := &Store{ ctx: context.Background(), - namespaceCache: nsi, clientGetter: cg, cacheFactory: cf, columnSetter: cs, @@ -765,7 +776,7 @@ func TestReset(t *testing.T) { transformBuilder: tb, } - cf.EXPECT().Reset().Return(nil) + cf.EXPECT().Stop().Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(fmt.Errorf("error")) err := s.Reset() assert.NotNil(t, err) @@ -774,7 +785,6 @@ func TestReset(t *testing.T) { tests = append(tests, testCase{ description: "client Reset() with column getter TableAdminClient() error returned, should return an error.", test: func(t *testing.T) { - nsi := NewMockCache(gomock.NewController(t)) cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) cs := NewMockSchemaColumnSetter(gomock.NewController(t)) @@ -782,7 +792,6 @@ func TestReset(t *testing.T) { s := &Store{ ctx: context.Background(), - namespaceCache: nsi, clientGetter: cg, cacheFactory: cf, columnSetter: cs, @@ -791,7 +800,7 @@ func TestReset(t *testing.T) { } nsSchema := baseNSSchema - cf.EXPECT().Reset().Return(nil) + cf.EXPECT().Stop().Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(nil, fmt.Errorf("error")) err := s.Reset() @@ -801,7 +810,6 @@ func TestReset(t *testing.T) { tests = append(tests, testCase{ description: "client Reset() with cache factory CacheFor() error returned, should return an error.", test: func(t *testing.T) { - nsc := NewMockCache(gomock.NewController(t)) cg := NewMockClientGetter(gomock.NewController(t)) cf := NewMockCacheFactory(gomock.NewController(t)) cs := NewMockSchemaColumnSetter(gomock.NewController(t)) @@ -810,7 +818,6 @@ func TestReset(t *testing.T) { s := &Store{ ctx: context.Background(), - namespaceCache: nsc, clientGetter: cg, cacheFactory: cf, columnSetter: cs, @@ -819,10 +826,10 @@ func TestReset(t *testing.T) { } nsSchema := baseNSSchema - cf.EXPECT().Reset().Return(nil) + cf.EXPECT().Stop().Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) - cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(factory.Cache{}, fmt.Errorf("error")) + cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nil, fmt.Errorf("error")) tb.EXPECT().GetTransformFunc(attributes.GVK(&nsSchema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) err := s.Reset() assert.NotNil(t, err) diff --git a/pkg/stores/sqlproxy/sql_informer_mocks_test.go b/pkg/stores/sqlproxy/sql_informer_mocks_test.go index f32a69d5..e79c1785 100644 --- a/pkg/stores/sqlproxy/sql_informer_mocks_test.go +++ b/pkg/stores/sqlproxy/sql_informer_mocks_test.go @@ -76,6 +76,18 @@ func (mr *MockByOptionsListerMockRecorder) ListByOptions(ctx, lo, partitions, na return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByOptions", reflect.TypeOf((*MockByOptionsLister)(nil).ListByOptions), ctx, lo, partitions, namespace) } +// RunGC mocks base method. +func (m *MockByOptionsLister) RunGC(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RunGC", arg0) +} + +// RunGC indicates an expected call of RunGC. +func (mr *MockByOptionsListerMockRecorder) RunGC(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunGC", reflect.TypeOf((*MockByOptionsLister)(nil).RunGC), arg0) +} + // Watch mocks base method. func (m *MockByOptionsLister) Watch(ctx context.Context, options informer.WatchOptions, eventsCh chan<- watch.Event) error { m.ctrl.T.Helper()