diff --git a/pkg/server/cli/clicontext.go b/pkg/server/cli/clicontext.go index efede13f..62a8807a 100644 --- a/pkg/server/cli/clicontext.go +++ b/pkg/server/cli/clicontext.go @@ -2,10 +2,12 @@ package cli import ( "context" + "time" steveauth "github.com/rancher/steve/pkg/auth" authcli "github.com/rancher/steve/pkg/auth/cli" "github.com/rancher/steve/pkg/server" + "github.com/rancher/steve/pkg/sqlcache/informer/factory" "github.com/rancher/steve/pkg/ui" "github.com/rancher/wrangler/v3/pkg/kubeconfig" "github.com/rancher/wrangler/v3/pkg/ratelimit" @@ -52,6 +54,10 @@ func (c *Config) ToServer(ctx context.Context, sqlCache bool) (*server.Server, e AuthMiddleware: auth, Next: ui.New(c.UIPath), SQLCache: sqlCache, + SQLCacheFactoryOptions: factory.CacheFactoryOptions{ + GCInterval: 15 * time.Minute, + GCKeepCount: 1000, + }, }) } diff --git a/pkg/sqlcache/db/client.go b/pkg/sqlcache/db/client.go index 4e680c22..2259640d 100644 --- a/pkg/sqlcache/db/client.go +++ b/pkg/sqlcache/db/client.go @@ -63,6 +63,13 @@ type Client interface { // // The transaction is committed if f returns nil, otherwise it is rolled back. func (c *client) WithTransaction(ctx context.Context, forWriting bool, f WithTransactionFunction) error { + if err := c.withTransaction(ctx, forWriting, f); err != nil { + return fmt.Errorf("transaction: %w", err) + } + return nil +} + +func (c *client) withTransaction(ctx context.Context, forWriting bool, f WithTransactionFunction) error { c.connLock.RLock() // note: this assumes _txlock=immediate in the connection string, see NewConnection tx, err := c.conn.BeginTx(ctx, &sql.TxOptions{ @@ -70,7 +77,7 @@ func (c *client) WithTransaction(ctx context.Context, forWriting bool, f WithTra }) c.connLock.RUnlock() if err != nil { - return err + return fmt.Errorf("begin tx: %w", err) } if err = f(transaction.NewClient(tx)); err != nil { diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index 0b2e3592..5661c8a9 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -28,14 +28,18 @@ const EncryptAllEnvVar = "CATTLE_ENCRYPT_CACHE_ALL" // CacheFactory builds Informer instances and keeps a cache of instances it created type CacheFactory struct { - wg wait.Group - dbClient db.Client - stopCh chan struct{} + wg wait.Group + dbClient db.Client + + // ctx determines when informers need to stop + ctx context.Context + cancel context.CancelFunc + mutex sync.RWMutex encryptAll bool - defaultMaximumEventsCount int - perGVKMaximumEventsCount map[schema.GroupVersionKind]int + gcInterval time.Duration + gcKeepCount int newInformer newInformer @@ -48,7 +52,7 @@ type guardedInformer struct { mutex *sync.Mutex } -type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, maxEventsCount int) (*informer.Informer, error) +type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) type Cache struct { informer.ByOptionsLister @@ -67,19 +71,10 @@ var defaultEncryptedResourceTypes = map[schema.GroupVersionKind]struct{}{ } type CacheFactoryOptions struct { - // DefaultMaximumEventsCount is the maximum number of events to keep in - // the events table by default. - // - // Use PerGVKMaximumEventsCount if you want to set a different value for - // a specific GVK. - // - // A value of 0 means no limits. - DefaultMaximumEventsCount int - // PerGVKMaximumEventsCount is the maximum number of events to keep in - // the events table for specific GVKs. - // - // A value of 0 means no limits. - PerGVKMaximumEventsCount map[schema.GroupVersionKind]int + // GCInterval is how often to run the garbage collection + GCInterval time.Duration + // GCKeepCount is how many events to keep in _events table when gc runs + GCKeepCount int } // NewCacheFactory returns an informer factory instance @@ -93,14 +88,18 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) { if err != nil { return nil, err } + ctx, cancel := context.WithCancel(context.Background()) return &CacheFactory{ - wg: wait.Group{}, - stopCh: make(chan struct{}), + wg: wait.Group{}, + + ctx: ctx, + cancel: cancel, + encryptAll: os.Getenv(EncryptAllEnvVar) == "true", dbClient: dbClient, - defaultMaximumEventsCount: opts.DefaultMaximumEventsCount, - perGVKMaximumEventsCount: opts.PerGVKMaximumEventsCount, + gcInterval: opts.GCInterval, + gcKeepCount: opts.GCKeepCount, newInformer: informer.NewInformer, informers: map[schema.GroupVersionKind]*guardedInformer{}, @@ -138,15 +137,14 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external // actually create the informer if gi.informer == nil { start := time.Now() - log.Debugf("CacheFor STARTS creating informer for %v", gvk) + log.Infof("CacheFor STARTS creating informer for %v", gvk) defer func() { - log.Debugf("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Now().Sub(start)) + log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Now().Sub(start)) }() _, encryptResourceAlways := defaultEncryptedResourceTypes[gvk] shouldEncrypt := f.encryptAll || encryptResourceAlways - maxEventsCount := f.getMaximumEventsCount(gvk) - i, err := f.newInformer(ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, maxEventsCount) + i, err := f.newInformer(f.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, f.gcInterval, f.gcKeepCount) if err != nil { return Cache{}, err } @@ -162,12 +160,12 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external return Cache{}, err } - f.wg.StartWithChannel(f.stopCh, i.Run) + f.wg.StartWithChannel(f.ctx.Done(), i.Run) gi.informer = i } - if !cache.WaitForCacheSync(f.stopCh, gi.informer.HasSynced) { + if !cache.WaitForCacheSync(f.ctx.Done(), gi.informer.HasSynced) { return Cache{}, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk) } @@ -175,14 +173,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external return Cache{ByOptionsLister: gi.informer}, nil } -func (f *CacheFactory) getMaximumEventsCount(gvk schema.GroupVersionKind) int { - if maxCount, ok := f.perGVKMaximumEventsCount[gvk]; ok { - return maxCount - } - return f.defaultMaximumEventsCount -} - -// Reset closes the stopCh which stops any running informers, assigns a new stopCh, resets the GVK-informer cache, and resets +// Reset cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets // the database connection which wipes any current sqlite database at the default location. func (f *CacheFactory) Reset() error { if f.dbClient == nil { @@ -195,8 +186,8 @@ func (f *CacheFactory) Reset() error { defer f.mutex.Unlock() // now that we are alone, stop all informers created until this point - close(f.stopCh) - f.stopCh = make(chan struct{}) + f.cancel() + f.ctx, f.cancel = context.WithCancel(context.Background()) f.wg.Wait() // and get rid of all references to those informers and their mutexes diff --git a/pkg/sqlcache/informer/factory/informer_factory_test.go b/pkg/sqlcache/informer/factory/informer_factory_test.go index 80a02d8b..f1d09c70 100644 --- a/pkg/sqlcache/informer/factory/informer_factory_test.go +++ b/pkg/sqlcache/informer/factory/informer_factory_test.go @@ -59,7 +59,7 @@ func TestCacheFor(t *testing.T) { var tests []testCase - tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and stopCh not closed, should return no error and should call Informer.Run(). A subsequent call to CacheFor() should return same informer", test: func(t *testing.T) { + tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and ctx not canceled, should return no error and should call Informer.Run(). A subsequent call to CacheFor() should return same informer", test: func(t *testing.T) { dbClient := NewMockClient(gomock.NewController(t)) dynamicClient := NewMockResourceInterface(gomock.NewController(t)) fields := [][]string{{"something"}} @@ -75,27 +75,27 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) - assert.Equal(t, 0, maxEventsCount) + assert.Equal(t, 0, gcKeepCount) assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ dbClient: dbClient, - stopCh: make(chan struct{}), newInformer: testNewInformer, informers: map[schema.GroupVersionKind]*guardedInformer{}, } + f.ctx, f.cancel = context.WithCancel(context.Background()) go func() { - // this function ensures that stopCh is open for the duration of this test but if part of a longer process it will be closed eventually + // this function ensures that ctx is open for the duration of this test but if part of a longer process it will be closed eventually time.Sleep(5 * time.Second) - close(f.stopCh) + f.cancel() }() var c Cache var err error @@ -108,7 +108,7 @@ func TestCacheFor(t *testing.T) { assert.Nil(t, err) assert.Equal(t, c, c2) }}) - tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning false, and stopCh not closed, should call Run() and return an error", test: func(t *testing.T) { + tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning false, and ctx not canceled, should call Run() and return an error", test: func(t *testing.T) { dbClient := NewMockClient(gomock.NewController(t)) dynamicClient := NewMockResourceInterface(gomock.NewController(t)) fields := [][]string{{"something"}} @@ -122,33 +122,33 @@ func TestCacheFor(t *testing.T) { // need to set this so Run function is not nil SharedIndexInformer: sii, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) - assert.Equal(t, 0, maxEventsCount) + assert.Equal(t, 0, gcKeepCount) assert.Nil(t, externalUpdateInfo) return expectedI, nil } f := &CacheFactory{ dbClient: dbClient, - stopCh: make(chan struct{}), newInformer: testNewInformer, informers: map[schema.GroupVersionKind]*guardedInformer{}, } + f.ctx, f.cancel = context.WithCancel(context.Background()) go func() { time.Sleep(1 * time.Second) - close(f.stopCh) + f.cancel() }() var err error _, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.NotNil(t, err) time.Sleep(2 * time.Second) }}) - tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and stopCh closed, should not call Run() more than once and not return an error", test: func(t *testing.T) { + tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and ctx is canceled, should not call Run() more than once and not return an error", test: func(t *testing.T) { dbClient := NewMockClient(gomock.NewController(t)) dynamicClient := NewMockResourceInterface(gomock.NewController(t)) fields := [][]string{{"something"}} @@ -166,24 +166,24 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) - assert.Equal(t, 0, maxEventsCount) + assert.Equal(t, 0, gcKeepCount) assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ dbClient: dbClient, - stopCh: make(chan struct{}), newInformer: testNewInformer, informers: map[schema.GroupVersionKind]*guardedInformer{}, } + f.ctx, f.cancel = context.WithCancel(context.Background()) + f.cancel() - close(f.stopCh) var c Cache var err error c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) @@ -207,27 +207,27 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) - assert.Equal(t, 0, maxEventsCount) + assert.Equal(t, 0, gcKeepCount) assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ dbClient: dbClient, - stopCh: make(chan struct{}), newInformer: testNewInformer, encryptAll: true, informers: map[schema.GroupVersionKind]*guardedInformer{}, } + f.ctx, f.cancel = context.WithCancel(context.Background()) go func() { time.Sleep(10 * time.Second) - close(f.stopCh) + f.cancel() }() var c Cache var err error @@ -257,27 +257,27 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) - assert.Equal(t, 0, maxEventsCount) + assert.Equal(t, 0, gcKeepCount) assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ dbClient: dbClient, - stopCh: make(chan struct{}), newInformer: testNewInformer, encryptAll: false, informers: map[schema.GroupVersionKind]*guardedInformer{}, } + f.ctx, f.cancel = context.WithCancel(context.Background()) go func() { time.Sleep(10 * time.Second) - close(f.stopCh) + f.cancel() }() var c Cache var err error @@ -306,27 +306,27 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) - assert.Equal(t, 0, maxEventsCount) + assert.Equal(t, 0, gcKeepCount) assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ dbClient: dbClient, - stopCh: make(chan struct{}), newInformer: testNewInformer, encryptAll: false, informers: map[schema.GroupVersionKind]*guardedInformer{}, } + f.ctx, f.cancel = context.WithCancel(context.Background()) go func() { time.Sleep(10 * time.Second) - close(f.stopCh) + f.cancel() }() var c Cache var err error @@ -336,7 +336,7 @@ func TestCacheFor(t *testing.T) { time.Sleep(1 * time.Second) }}) - tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, stopCh not closed, and transform func should return no error", test: func(t *testing.T) { + tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, ctx not canceled, and transform func should return no error", test: func(t *testing.T) { dbClient := NewMockClient(gomock.NewController(t)) dynamicClient := NewMockResourceInterface(gomock.NewController(t)) fields := [][]string{{"something"}} @@ -355,7 +355,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { // we can't test func == func, so instead we check if the output was as expected input := "someinput" ouput, err := transform(input) @@ -369,21 +369,21 @@ func TestCacheFor(t *testing.T) { assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) - assert.Equal(t, 0, maxEventsCount) + assert.Equal(t, 0, gcKeepCount) assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ dbClient: dbClient, - stopCh: make(chan struct{}), newInformer: testNewInformer, informers: map[schema.GroupVersionKind]*guardedInformer{}, } + f.ctx, f.cancel = context.WithCancel(context.Background()) go func() { - // this function ensures that stopCh is open for the duration of this test but if part of a longer process it will be closed eventually + // this function ensures that ctx is not canceled for the duration of this test but if part of a longer process it will be closed eventually time.Sleep(5 * time.Second) - close(f.stopCh) + f.cancel() }() var c Cache var err error @@ -408,85 +408,34 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) - assert.Equal(t, 10, maxEventsCount) + assert.Equal(t, 5*time.Second, gcInterval) + assert.Equal(t, 10, gcKeepCount) assert.Nil(t, externalUpdateInfo) return i, nil } f := &CacheFactory{ - defaultMaximumEventsCount: 10, - dbClient: dbClient, - stopCh: make(chan struct{}), - newInformer: testNewInformer, - encryptAll: true, - informers: map[schema.GroupVersionKind]*guardedInformer{}, - } - - go func() { - time.Sleep(10 * time.Second) - close(f.stopCh) - }() - var c Cache - var err error - // CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) - c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) - assert.Nil(t, err) - assert.Equal(t, expectedC, c) - time.Sleep(1 * time.Second) - }}) - tests = append(tests, testCase{description: "CacheFor() with per GVK maximum events count", test: func(t *testing.T) { - dbClient := NewMockClient(gomock.NewController(t)) - dynamicClient := NewMockResourceInterface(gomock.NewController(t)) - fields := [][]string{{"something"}} - expectedGVK := schema.GroupVersionKind{ - Group: "management.cattle.io", - Version: "v3", - Kind: "Token", - } - sii := NewMockSharedIndexInformer(gomock.NewController(t)) - sii.EXPECT().HasSynced().Return(true) - sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() - sii.EXPECT().SetWatchErrorHandler(gomock.Any()) - i := &informer.Informer{ - // need to set this so Run function is not nil - SharedIndexInformer: sii, - } - expectedC := Cache{ - ByOptionsLister: i, - } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { - assert.Equal(t, client, dynamicClient) - assert.Equal(t, fields, fields) - assert.Equal(t, expectedGVK, gvk) - assert.Equal(t, db, dbClient) - assert.Equal(t, true, shouldEncrypt) - assert.Equal(t, 10, maxEventsCount) - assert.Nil(t, externalUpdateInfo) - return i, nil - } - f := &CacheFactory{ - defaultMaximumEventsCount: 5, - perGVKMaximumEventsCount: map[schema.GroupVersionKind]int{ - expectedGVK: 10, - }, + gcInterval: 5 * time.Second, + gcKeepCount: 10, dbClient: dbClient, - stopCh: make(chan struct{}), newInformer: testNewInformer, encryptAll: true, informers: map[schema.GroupVersionKind]*guardedInformer{}, } + f.ctx, f.cancel = context.WithCancel(context.Background()) go func() { time.Sleep(10 * time.Second) - close(f.stopCh) + f.cancel() }() var c Cache var err error + // CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index 87d953a7..b0a8b703 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -55,7 +55,7 @@ var newInformer = cache.NewSharedIndexInformer // NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form // using the specified client -func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*Informer, error) { +func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*Informer, error) { watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { return client.Watch(ctx, options) } @@ -118,9 +118,10 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [ } opts := ListOptionIndexerOptions{ - Fields: fields, - IsNamespaced: namespaced, - MaximumEventsCount: maxEventsCount, + Fields: fields, + IsNamespaced: namespaced, + GCInterval: gcInterval, + GCKeepCount: gcKeepCount, } loi, err := NewListOptionIndexer(ctx, s, opts) if err != nil { diff --git a/pkg/sqlcache/informer/informer_test.go b/pkg/sqlcache/informer/informer_test.go index 63be28af..c6ca2f68 100644 --- a/pkg/sqlcache/informer/informer_test.go +++ b/pkg/sqlcache/informer/informer_test.go @@ -81,7 +81,7 @@ func TestNewInformer(t *testing.T) { } }) - informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0) + informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0, 0) assert.Nil(t, err) assert.NotNil(t, informer.ByOptionsLister) assert.NotNil(t, informer.SharedIndexInformer) @@ -105,7 +105,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) { @@ -140,7 +140,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) { @@ -193,7 +193,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) { @@ -257,7 +257,7 @@ func TestNewInformer(t *testing.T) { transformFunc := func(input interface{}) (interface{}, error) { return "someoutput", nil } - informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, transformFunc, gvk, dbClient, false, true, true, 0) + informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, transformFunc, gvk, dbClient, false, true, true, 0, 0) assert.Nil(t, err) assert.NotNil(t, informer.ByOptionsLister) assert.NotNil(t, informer.SharedIndexInformer) @@ -293,7 +293,7 @@ func TestNewInformer(t *testing.T) { transformFunc := func(input interface{}) (interface{}, error) { return "someoutput", nil } - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, transformFunc, gvk, dbClient, false, true, true, 0) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, transformFunc, gvk, dbClient, false, true, true, 0, 0) assert.Error(t, err) newInformer = cache.NewSharedIndexInformer }}) diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index a00e03be..a2abeb32 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -15,6 +15,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/rancher/steve/pkg/sqlcache/db/transaction" "github.com/rancher/steve/pkg/sqlcache/sqltypes" @@ -37,9 +38,6 @@ type ListOptionIndexer struct { namespaced bool indexedFields []string - // maximumEventsCount is how many events to keep. 0 means keep all events. - maximumEventsCount int - latestRVLock sync.RWMutex latestRV string @@ -137,11 +135,10 @@ type ListOptionIndexerOptions struct { // IsNamespaced determines whether the GVK for this ListOptionIndexer is // namespaced IsNamespaced bool - // MaximumEventsCount is the maximum number of events we want to keep - // in the _events table. - // - // Zero means never delete events. - MaximumEventsCount int + // GCInterval is how often to run the garbage collection + GCInterval time.Duration + // GCKeepCount is how many events to keep in _events table when gc runs + GCKeepCount int } // NewListOptionIndexer returns a SQLite-backed cache.Indexer of unstructured.Unstructured Kubernetes resources of a certain GVK @@ -168,24 +165,20 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp } l := &ListOptionIndexer{ - Indexer: i, - namespaced: opts.IsNamespaced, - indexedFields: indexedFields, - maximumEventsCount: opts.MaximumEventsCount, - watchers: make(map[*watchKey]*watcher), + Indexer: i, + namespaced: opts.IsNamespaced, + indexedFields: indexedFields, + watchers: make(map[*watchKey]*watcher), } l.RegisterAfterAdd(l.addIndexFields) l.RegisterAfterAdd(l.addLabels) l.RegisterAfterAdd(l.notifyEventAdded) - l.RegisterAfterAdd(l.deleteOldEvents) l.RegisterAfterUpdate(l.addIndexFields) l.RegisterAfterUpdate(l.addLabels) l.RegisterAfterUpdate(l.notifyEventModified) - l.RegisterAfterUpdate(l.deleteOldEvents) l.RegisterAfterDelete(l.deleteFieldsByKey) l.RegisterAfterDelete(l.deleteLabelsByKey) l.RegisterAfterDelete(l.notifyEventDeleted) - l.RegisterAfterDelete(l.deleteOldEvents) l.RegisterAfterDeleteAll(l.deleteFields) l.RegisterAfterDeleteAll(l.deleteLabels) columnDefs := make([]string, len(indexedFields)) @@ -206,16 +199,18 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp return &db.QueryError{QueryString: createEventsTableFmt, Err: err} } - _, err = tx.Exec(fmt.Sprintf(createFieldsTableFmt, dbName, strings.Join(columnDefs, ", "))) + createFieldsTableQuery := fmt.Sprintf(createFieldsTableFmt, dbName, strings.Join(columnDefs, ", ")) + _, err = tx.Exec(createFieldsTableQuery) if err != nil { - return err + return &db.QueryError{QueryString: createFieldsTableQuery, Err: err} } for index, field := range indexedFields { // create index for field - _, err = tx.Exec(fmt.Sprintf(createFieldsIndexFmt, dbName, field, dbName, field)) + createFieldsIndexQuery := fmt.Sprintf(createFieldsIndexFmt, dbName, field, dbName, field) + _, err = tx.Exec(createFieldsIndexQuery) if err != nil { - return err + return &db.QueryError{QueryString: createFieldsIndexQuery, Err: err} } // format field into column for prepared statement @@ -283,6 +278,8 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp l.deleteLabelsByKeyStmt = l.Prepare(l.deleteLabelsByKeyQuery) l.deleteLabelsStmt = l.Prepare(l.deleteLabelsQuery) + go l.runGC(ctx, opts.GCInterval, opts.GCKeepCount) + return l, nil } @@ -479,18 +476,6 @@ func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, o return nil } -func (l *ListOptionIndexer) deleteOldEvents(key string, obj any, tx transaction.Client) error { - if l.maximumEventsCount == 0 { - return nil - } - - _, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(l.maximumEventsCount) - if err != nil { - return &db.QueryError{QueryString: l.deleteEventsByCountQuery, Err: err} - } - return nil -} - // addIndexFields saves sortable/filterable fields into tables func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx transaction.Client) error { args := []any{key} @@ -817,7 +802,7 @@ func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryIn } items, err = l.ReadObjects(rows, l.GetType(), l.GetShouldEncrypt()) if err != nil { - return err + return fmt.Errorf("read objects: %w", err) } total = len(items) @@ -1379,3 +1364,32 @@ func matchFilter(filterName string, filterNamespace string, filterSelector label } return true } + +func (l *ListOptionIndexer) runGC(ctx context.Context, interval time.Duration, keepCount int) { + if interval == 0 || keepCount == 0 { + return + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + logrus.Infof("Started SQL cache garbage collection for %s (interval=%s, keep=%d)", l.GetName(), interval, keepCount) + + for { + select { + case <-ticker.C: + err := l.WithTransaction(ctx, true, func(tx transaction.Client) error { + _, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(keepCount) + if err != nil { + return &db.QueryError{QueryString: l.deleteEventsByCountQuery, Err: err} + } + return nil + }) + if err != nil { + logrus.Errorf("garbage collection for %s: %v", l.GetName(), err) + } + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index 4d489710..bb5156a9 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -98,9 +98,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) // create events table @@ -175,9 +175,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) store.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(fmt.Errorf("error")) @@ -210,9 +210,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) @@ -255,9 +255,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) @@ -304,9 +304,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) @@ -2745,7 +2745,8 @@ func TestWatchGarbageCollection(t *testing.T) { parentCtx := context.Background() opts := ListOptionIndexerOptions{ - MaximumEventsCount: 2, + GCInterval: 40 * time.Millisecond, + GCKeepCount: 2, } loi, dbPath, err := makeListOptionIndexer(parentCtx, opts) defer cleanTempFiles(dbPath) @@ -2774,6 +2775,9 @@ func TestWatchGarbageCollection(t *testing.T) { assert.NoError(t, err) rv4 := getRV(t) + // Make sure GC runs + time.Sleep(2 * opts.GCInterval) + for _, rv := range []string{rv1, rv2} { watcherCh, errCh := startWatcher(parentCtx, loi, rv) gotEvents := receiveEvents(watcherCh) @@ -2811,6 +2815,9 @@ func TestWatchGarbageCollection(t *testing.T) { assert.NoError(t, err) rv5 := getRV(t) + // Make sure GC runs + time.Sleep(2 * opts.GCInterval) + for _, rv := range []string{rv1, rv2, rv3} { watcherCh, errCh := startWatcher(parentCtx, loi, rv) gotEvents := receiveEvents(watcherCh) diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index 9e0c74ea..723a1682 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -317,7 +317,7 @@ func (s *Store) Reset() error { s.lock.Lock() defer s.lock.Unlock() if err := s.cacheFactory.Reset(); err != nil { - return err + return fmt.Errorf("reset: %w", err) } if err := s.initializeNamespaceCache(); err != nil { @@ -784,7 +784,7 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, apiSchema *types.APISc ns := attributes.Namespaced(apiSchema) inf, err := s.cacheFactory.CacheFor(s.ctx, fields, externalGVKDependencies[gvk], selfGVKDependencies[gvk], transformFunc, tableClient, gvk, ns, controllerschema.IsListWatchable(apiSchema)) if err != nil { - return nil, 0, "", err + return nil, 0, "", fmt.Errorf("cachefor %v: %w", gvk, err) } if gvk.Group == "ext.cattle.io" && (gvk.Kind == "Token" || gvk.Kind == "Kubeconfig") { accessSet := accesscontrol.AccessSetFromAPIRequest(apiOp) @@ -814,7 +814,7 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, apiSchema *types.APISc if errors.Is(err, informer.ErrInvalidColumn) { return nil, 0, "", apierror.NewAPIError(validation.InvalidBodyContent, err.Error()) } - return nil, 0, "", err + return nil, 0, "", fmt.Errorf("listbyoptions %v: %w", gvk, err) } return list, total, continueToken, nil