1
0
mirror of https://github.com/rancher/steve.git synced 2025-09-08 18:59:58 +00:00

Better gc (#717)

* More error wrapping for SQL cache

* Use context.Context instead of stopCh

* Move CacheFor to Info log level

* Implement time-based GC

* Set default GC param when running standalone steve
This commit is contained in:
Tom Lebreux
2025-07-11 12:48:19 -04:00
committed by GitHub
parent 3692666375
commit 127d37391d
9 changed files with 170 additions and 195 deletions

View File

@@ -2,10 +2,12 @@ package cli
import (
"context"
"time"
steveauth "github.com/rancher/steve/pkg/auth"
authcli "github.com/rancher/steve/pkg/auth/cli"
"github.com/rancher/steve/pkg/server"
"github.com/rancher/steve/pkg/sqlcache/informer/factory"
"github.com/rancher/steve/pkg/ui"
"github.com/rancher/wrangler/v3/pkg/kubeconfig"
"github.com/rancher/wrangler/v3/pkg/ratelimit"
@@ -52,6 +54,10 @@ func (c *Config) ToServer(ctx context.Context, sqlCache bool) (*server.Server, e
AuthMiddleware: auth,
Next: ui.New(c.UIPath),
SQLCache: sqlCache,
SQLCacheFactoryOptions: factory.CacheFactoryOptions{
GCInterval: 15 * time.Minute,
GCKeepCount: 1000,
},
})
}

View File

@@ -63,6 +63,13 @@ type Client interface {
//
// The transaction is committed if f returns nil, otherwise it is rolled back.
func (c *client) WithTransaction(ctx context.Context, forWriting bool, f WithTransactionFunction) error {
if err := c.withTransaction(ctx, forWriting, f); err != nil {
return fmt.Errorf("transaction: %w", err)
}
return nil
}
func (c *client) withTransaction(ctx context.Context, forWriting bool, f WithTransactionFunction) error {
c.connLock.RLock()
// note: this assumes _txlock=immediate in the connection string, see NewConnection
tx, err := c.conn.BeginTx(ctx, &sql.TxOptions{
@@ -70,7 +77,7 @@ func (c *client) WithTransaction(ctx context.Context, forWriting bool, f WithTra
})
c.connLock.RUnlock()
if err != nil {
return err
return fmt.Errorf("begin tx: %w", err)
}
if err = f(transaction.NewClient(tx)); err != nil {

View File

@@ -28,14 +28,18 @@ const EncryptAllEnvVar = "CATTLE_ENCRYPT_CACHE_ALL"
// CacheFactory builds Informer instances and keeps a cache of instances it created
type CacheFactory struct {
wg wait.Group
dbClient db.Client
stopCh chan struct{}
wg wait.Group
dbClient db.Client
// ctx determines when informers need to stop
ctx context.Context
cancel context.CancelFunc
mutex sync.RWMutex
encryptAll bool
defaultMaximumEventsCount int
perGVKMaximumEventsCount map[schema.GroupVersionKind]int
gcInterval time.Duration
gcKeepCount int
newInformer newInformer
@@ -48,7 +52,7 @@ type guardedInformer struct {
mutex *sync.Mutex
}
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, maxEventsCount int) (*informer.Informer, error)
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error)
type Cache struct {
informer.ByOptionsLister
@@ -67,19 +71,10 @@ var defaultEncryptedResourceTypes = map[schema.GroupVersionKind]struct{}{
}
type CacheFactoryOptions struct {
// DefaultMaximumEventsCount is the maximum number of events to keep in
// the events table by default.
//
// Use PerGVKMaximumEventsCount if you want to set a different value for
// a specific GVK.
//
// A value of 0 means no limits.
DefaultMaximumEventsCount int
// PerGVKMaximumEventsCount is the maximum number of events to keep in
// the events table for specific GVKs.
//
// A value of 0 means no limits.
PerGVKMaximumEventsCount map[schema.GroupVersionKind]int
// GCInterval is how often to run the garbage collection
GCInterval time.Duration
// GCKeepCount is how many events to keep in _events table when gc runs
GCKeepCount int
}
// NewCacheFactory returns an informer factory instance
@@ -93,14 +88,18 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) {
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &CacheFactory{
wg: wait.Group{},
stopCh: make(chan struct{}),
wg: wait.Group{},
ctx: ctx,
cancel: cancel,
encryptAll: os.Getenv(EncryptAllEnvVar) == "true",
dbClient: dbClient,
defaultMaximumEventsCount: opts.DefaultMaximumEventsCount,
perGVKMaximumEventsCount: opts.PerGVKMaximumEventsCount,
gcInterval: opts.GCInterval,
gcKeepCount: opts.GCKeepCount,
newInformer: informer.NewInformer,
informers: map[schema.GroupVersionKind]*guardedInformer{},
@@ -138,15 +137,14 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
// actually create the informer
if gi.informer == nil {
start := time.Now()
log.Debugf("CacheFor STARTS creating informer for %v", gvk)
log.Infof("CacheFor STARTS creating informer for %v", gvk)
defer func() {
log.Debugf("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Now().Sub(start))
log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Now().Sub(start))
}()
_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
shouldEncrypt := f.encryptAll || encryptResourceAlways
maxEventsCount := f.getMaximumEventsCount(gvk)
i, err := f.newInformer(ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, maxEventsCount)
i, err := f.newInformer(f.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, f.gcInterval, f.gcKeepCount)
if err != nil {
return Cache{}, err
}
@@ -162,12 +160,12 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
return Cache{}, err
}
f.wg.StartWithChannel(f.stopCh, i.Run)
f.wg.StartWithChannel(f.ctx.Done(), i.Run)
gi.informer = i
}
if !cache.WaitForCacheSync(f.stopCh, gi.informer.HasSynced) {
if !cache.WaitForCacheSync(f.ctx.Done(), gi.informer.HasSynced) {
return Cache{}, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk)
}
@@ -175,14 +173,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
return Cache{ByOptionsLister: gi.informer}, nil
}
func (f *CacheFactory) getMaximumEventsCount(gvk schema.GroupVersionKind) int {
if maxCount, ok := f.perGVKMaximumEventsCount[gvk]; ok {
return maxCount
}
return f.defaultMaximumEventsCount
}
// Reset closes the stopCh which stops any running informers, assigns a new stopCh, resets the GVK-informer cache, and resets
// Reset cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets
// the database connection which wipes any current sqlite database at the default location.
func (f *CacheFactory) Reset() error {
if f.dbClient == nil {
@@ -195,8 +186,8 @@ func (f *CacheFactory) Reset() error {
defer f.mutex.Unlock()
// now that we are alone, stop all informers created until this point
close(f.stopCh)
f.stopCh = make(chan struct{})
f.cancel()
f.ctx, f.cancel = context.WithCancel(context.Background())
f.wg.Wait()
// and get rid of all references to those informers and their mutexes

View File

@@ -59,7 +59,7 @@ func TestCacheFor(t *testing.T) {
var tests []testCase
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and stopCh not closed, should return no error and should call Informer.Run(). A subsequent call to CacheFor() should return same informer", test: func(t *testing.T) {
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and ctx not canceled, should return no error and should call Informer.Run(). A subsequent call to CacheFor() should return same informer", test: func(t *testing.T) {
dbClient := NewMockClient(gomock.NewController(t))
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
@@ -75,27 +75,27 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, false, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
assert.Equal(t, 0, gcKeepCount)
assert.Nil(t, externalUpdateInfo)
return i, nil
}
f := &CacheFactory{
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
f.ctx, f.cancel = context.WithCancel(context.Background())
go func() {
// this function ensures that stopCh is open for the duration of this test but if part of a longer process it will be closed eventually
// this function ensures that ctx is open for the duration of this test but if part of a longer process it will be closed eventually
time.Sleep(5 * time.Second)
close(f.stopCh)
f.cancel()
}()
var c Cache
var err error
@@ -108,7 +108,7 @@ func TestCacheFor(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, c, c2)
}})
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning false, and stopCh not closed, should call Run() and return an error", test: func(t *testing.T) {
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning false, and ctx not canceled, should call Run() and return an error", test: func(t *testing.T) {
dbClient := NewMockClient(gomock.NewController(t))
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
@@ -122,33 +122,33 @@ func TestCacheFor(t *testing.T) {
// need to set this so Run function is not nil
SharedIndexInformer: sii,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, false, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
assert.Equal(t, 0, gcKeepCount)
assert.Nil(t, externalUpdateInfo)
return expectedI, nil
}
f := &CacheFactory{
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
f.ctx, f.cancel = context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
close(f.stopCh)
f.cancel()
}()
var err error
_, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
assert.NotNil(t, err)
time.Sleep(2 * time.Second)
}})
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and stopCh closed, should not call Run() more than once and not return an error", test: func(t *testing.T) {
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and ctx is canceled, should not call Run() more than once and not return an error", test: func(t *testing.T) {
dbClient := NewMockClient(gomock.NewController(t))
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
@@ -166,24 +166,24 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, false, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
assert.Equal(t, 0, gcKeepCount)
assert.Nil(t, externalUpdateInfo)
return i, nil
}
f := &CacheFactory{
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
f.ctx, f.cancel = context.WithCancel(context.Background())
f.cancel()
close(f.stopCh)
var c Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
@@ -207,27 +207,27 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
assert.Equal(t, 0, gcKeepCount)
assert.Nil(t, externalUpdateInfo)
return i, nil
}
f := &CacheFactory{
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
encryptAll: true,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
f.ctx, f.cancel = context.WithCancel(context.Background())
go func() {
time.Sleep(10 * time.Second)
close(f.stopCh)
f.cancel()
}()
var c Cache
var err error
@@ -257,27 +257,27 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
assert.Equal(t, 0, gcKeepCount)
assert.Nil(t, externalUpdateInfo)
return i, nil
}
f := &CacheFactory{
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
encryptAll: false,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
f.ctx, f.cancel = context.WithCancel(context.Background())
go func() {
time.Sleep(10 * time.Second)
close(f.stopCh)
f.cancel()
}()
var c Cache
var err error
@@ -306,27 +306,27 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
assert.Equal(t, 0, gcKeepCount)
assert.Nil(t, externalUpdateInfo)
return i, nil
}
f := &CacheFactory{
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
encryptAll: false,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
f.ctx, f.cancel = context.WithCancel(context.Background())
go func() {
time.Sleep(10 * time.Second)
close(f.stopCh)
f.cancel()
}()
var c Cache
var err error
@@ -336,7 +336,7 @@ func TestCacheFor(t *testing.T) {
time.Sleep(1 * time.Second)
}})
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, stopCh not closed, and transform func should return no error", test: func(t *testing.T) {
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, ctx not canceled, and transform func should return no error", test: func(t *testing.T) {
dbClient := NewMockClient(gomock.NewController(t))
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
@@ -355,7 +355,7 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
// we can't test func == func, so instead we check if the output was as expected
input := "someinput"
ouput, err := transform(input)
@@ -369,21 +369,21 @@ func TestCacheFor(t *testing.T) {
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, false, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
assert.Equal(t, 0, gcKeepCount)
assert.Nil(t, externalUpdateInfo)
return i, nil
}
f := &CacheFactory{
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
f.ctx, f.cancel = context.WithCancel(context.Background())
go func() {
// this function ensures that stopCh is open for the duration of this test but if part of a longer process it will be closed eventually
// this function ensures that ctx is not canceled for the duration of this test but if part of a longer process it will be closed eventually
time.Sleep(5 * time.Second)
close(f.stopCh)
f.cancel()
}()
var c Cache
var err error
@@ -408,85 +408,34 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 10, maxEventsCount)
assert.Equal(t, 5*time.Second, gcInterval)
assert.Equal(t, 10, gcKeepCount)
assert.Nil(t, externalUpdateInfo)
return i, nil
}
f := &CacheFactory{
defaultMaximumEventsCount: 10,
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
encryptAll: true,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
go func() {
time.Sleep(10 * time.Second)
close(f.stopCh)
}()
var c Cache
var err error
// CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool)
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
assert.Equal(t, expectedC, c)
time.Sleep(1 * time.Second)
}})
tests = append(tests, testCase{description: "CacheFor() with per GVK maximum events count", test: func(t *testing.T) {
dbClient := NewMockClient(gomock.NewController(t))
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{
Group: "management.cattle.io",
Version: "v3",
Kind: "Token",
}
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true)
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
sii.EXPECT().SetWatchErrorHandler(gomock.Any())
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
}
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 10, maxEventsCount)
assert.Nil(t, externalUpdateInfo)
return i, nil
}
f := &CacheFactory{
defaultMaximumEventsCount: 5,
perGVKMaximumEventsCount: map[schema.GroupVersionKind]int{
expectedGVK: 10,
},
gcInterval: 5 * time.Second,
gcKeepCount: 10,
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
encryptAll: true,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
f.ctx, f.cancel = context.WithCancel(context.Background())
go func() {
time.Sleep(10 * time.Second)
close(f.stopCh)
f.cancel()
}()
var c Cache
var err error
// CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool)
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
assert.Equal(t, expectedC, c)

View File

@@ -55,7 +55,7 @@ var newInformer = cache.NewSharedIndexInformer
// NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form
// using the specified client
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*Informer, error) {
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*Informer, error) {
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
return client.Watch(ctx, options)
}
@@ -118,9 +118,10 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [
}
opts := ListOptionIndexerOptions{
Fields: fields,
IsNamespaced: namespaced,
MaximumEventsCount: maxEventsCount,
Fields: fields,
IsNamespaced: namespaced,
GCInterval: gcInterval,
GCKeepCount: gcKeepCount,
}
loi, err := NewListOptionIndexer(ctx, s, opts)
if err != nil {

View File

@@ -81,7 +81,7 @@ func TestNewInformer(t *testing.T) {
}
})
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0)
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0, 0)
assert.Nil(t, err)
assert.NotNil(t, informer.ByOptionsLister)
assert.NotNil(t, informer.SharedIndexInformer)
@@ -105,7 +105,7 @@ func TestNewInformer(t *testing.T) {
}
})
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0)
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0, 0)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) {
@@ -140,7 +140,7 @@ func TestNewInformer(t *testing.T) {
}
})
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0)
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0, 0)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) {
@@ -193,7 +193,7 @@ func TestNewInformer(t *testing.T) {
}
})
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0)
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, nil, gvk, dbClient, false, true, true, 0, 0)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) {
@@ -257,7 +257,7 @@ func TestNewInformer(t *testing.T) {
transformFunc := func(input interface{}) (interface{}, error) {
return "someoutput", nil
}
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, transformFunc, gvk, dbClient, false, true, true, 0)
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, transformFunc, gvk, dbClient, false, true, true, 0, 0)
assert.Nil(t, err)
assert.NotNil(t, informer.ByOptionsLister)
assert.NotNil(t, informer.SharedIndexInformer)
@@ -293,7 +293,7 @@ func TestNewInformer(t *testing.T) {
transformFunc := func(input interface{}) (interface{}, error) {
return "someoutput", nil
}
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, transformFunc, gvk, dbClient, false, true, true, 0)
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, nil, transformFunc, gvk, dbClient, false, true, true, 0, 0)
assert.Error(t, err)
newInformer = cache.NewSharedIndexInformer
}})

View File

@@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/rancher/steve/pkg/sqlcache/db/transaction"
"github.com/rancher/steve/pkg/sqlcache/sqltypes"
@@ -37,9 +38,6 @@ type ListOptionIndexer struct {
namespaced bool
indexedFields []string
// maximumEventsCount is how many events to keep. 0 means keep all events.
maximumEventsCount int
latestRVLock sync.RWMutex
latestRV string
@@ -137,11 +135,10 @@ type ListOptionIndexerOptions struct {
// IsNamespaced determines whether the GVK for this ListOptionIndexer is
// namespaced
IsNamespaced bool
// MaximumEventsCount is the maximum number of events we want to keep
// in the _events table.
//
// Zero means never delete events.
MaximumEventsCount int
// GCInterval is how often to run the garbage collection
GCInterval time.Duration
// GCKeepCount is how many events to keep in _events table when gc runs
GCKeepCount int
}
// NewListOptionIndexer returns a SQLite-backed cache.Indexer of unstructured.Unstructured Kubernetes resources of a certain GVK
@@ -168,24 +165,20 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp
}
l := &ListOptionIndexer{
Indexer: i,
namespaced: opts.IsNamespaced,
indexedFields: indexedFields,
maximumEventsCount: opts.MaximumEventsCount,
watchers: make(map[*watchKey]*watcher),
Indexer: i,
namespaced: opts.IsNamespaced,
indexedFields: indexedFields,
watchers: make(map[*watchKey]*watcher),
}
l.RegisterAfterAdd(l.addIndexFields)
l.RegisterAfterAdd(l.addLabels)
l.RegisterAfterAdd(l.notifyEventAdded)
l.RegisterAfterAdd(l.deleteOldEvents)
l.RegisterAfterUpdate(l.addIndexFields)
l.RegisterAfterUpdate(l.addLabels)
l.RegisterAfterUpdate(l.notifyEventModified)
l.RegisterAfterUpdate(l.deleteOldEvents)
l.RegisterAfterDelete(l.deleteFieldsByKey)
l.RegisterAfterDelete(l.deleteLabelsByKey)
l.RegisterAfterDelete(l.notifyEventDeleted)
l.RegisterAfterDelete(l.deleteOldEvents)
l.RegisterAfterDeleteAll(l.deleteFields)
l.RegisterAfterDeleteAll(l.deleteLabels)
columnDefs := make([]string, len(indexedFields))
@@ -206,16 +199,18 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp
return &db.QueryError{QueryString: createEventsTableFmt, Err: err}
}
_, err = tx.Exec(fmt.Sprintf(createFieldsTableFmt, dbName, strings.Join(columnDefs, ", ")))
createFieldsTableQuery := fmt.Sprintf(createFieldsTableFmt, dbName, strings.Join(columnDefs, ", "))
_, err = tx.Exec(createFieldsTableQuery)
if err != nil {
return err
return &db.QueryError{QueryString: createFieldsTableQuery, Err: err}
}
for index, field := range indexedFields {
// create index for field
_, err = tx.Exec(fmt.Sprintf(createFieldsIndexFmt, dbName, field, dbName, field))
createFieldsIndexQuery := fmt.Sprintf(createFieldsIndexFmt, dbName, field, dbName, field)
_, err = tx.Exec(createFieldsIndexQuery)
if err != nil {
return err
return &db.QueryError{QueryString: createFieldsIndexQuery, Err: err}
}
// format field into column for prepared statement
@@ -283,6 +278,8 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp
l.deleteLabelsByKeyStmt = l.Prepare(l.deleteLabelsByKeyQuery)
l.deleteLabelsStmt = l.Prepare(l.deleteLabelsQuery)
go l.runGC(ctx, opts.GCInterval, opts.GCKeepCount)
return l, nil
}
@@ -479,18 +476,6 @@ func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, o
return nil
}
func (l *ListOptionIndexer) deleteOldEvents(key string, obj any, tx transaction.Client) error {
if l.maximumEventsCount == 0 {
return nil
}
_, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(l.maximumEventsCount)
if err != nil {
return &db.QueryError{QueryString: l.deleteEventsByCountQuery, Err: err}
}
return nil
}
// addIndexFields saves sortable/filterable fields into tables
func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx transaction.Client) error {
args := []any{key}
@@ -817,7 +802,7 @@ func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryIn
}
items, err = l.ReadObjects(rows, l.GetType(), l.GetShouldEncrypt())
if err != nil {
return err
return fmt.Errorf("read objects: %w", err)
}
total = len(items)
@@ -1379,3 +1364,32 @@ func matchFilter(filterName string, filterNamespace string, filterSelector label
}
return true
}
func (l *ListOptionIndexer) runGC(ctx context.Context, interval time.Duration, keepCount int) {
if interval == 0 || keepCount == 0 {
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
logrus.Infof("Started SQL cache garbage collection for %s (interval=%s, keep=%d)", l.GetName(), interval, keepCount)
for {
select {
case <-ticker.C:
err := l.WithTransaction(ctx, true, func(tx transaction.Client) error {
_, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(keepCount)
if err != nil {
return &db.QueryError{QueryString: l.deleteEventsByCountQuery, Err: err}
}
return nil
})
if err != nil {
logrus.Errorf("garbage collection for %s: %v", l.GetName(), err)
}
case <-ctx.Done():
return
}
}
}

View File

@@ -98,9 +98,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
// create events table
@@ -175,9 +175,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
store.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(fmt.Errorf("error"))
@@ -210,9 +210,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
@@ -255,9 +255,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
@@ -304,9 +304,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
@@ -2745,7 +2745,8 @@ func TestWatchGarbageCollection(t *testing.T) {
parentCtx := context.Background()
opts := ListOptionIndexerOptions{
MaximumEventsCount: 2,
GCInterval: 40 * time.Millisecond,
GCKeepCount: 2,
}
loi, dbPath, err := makeListOptionIndexer(parentCtx, opts)
defer cleanTempFiles(dbPath)
@@ -2774,6 +2775,9 @@ func TestWatchGarbageCollection(t *testing.T) {
assert.NoError(t, err)
rv4 := getRV(t)
// Make sure GC runs
time.Sleep(2 * opts.GCInterval)
for _, rv := range []string{rv1, rv2} {
watcherCh, errCh := startWatcher(parentCtx, loi, rv)
gotEvents := receiveEvents(watcherCh)
@@ -2811,6 +2815,9 @@ func TestWatchGarbageCollection(t *testing.T) {
assert.NoError(t, err)
rv5 := getRV(t)
// Make sure GC runs
time.Sleep(2 * opts.GCInterval)
for _, rv := range []string{rv1, rv2, rv3} {
watcherCh, errCh := startWatcher(parentCtx, loi, rv)
gotEvents := receiveEvents(watcherCh)

View File

@@ -317,7 +317,7 @@ func (s *Store) Reset() error {
s.lock.Lock()
defer s.lock.Unlock()
if err := s.cacheFactory.Reset(); err != nil {
return err
return fmt.Errorf("reset: %w", err)
}
if err := s.initializeNamespaceCache(); err != nil {
@@ -784,7 +784,7 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, apiSchema *types.APISc
ns := attributes.Namespaced(apiSchema)
inf, err := s.cacheFactory.CacheFor(s.ctx, fields, externalGVKDependencies[gvk], selfGVKDependencies[gvk], transformFunc, tableClient, gvk, ns, controllerschema.IsListWatchable(apiSchema))
if err != nil {
return nil, 0, "", err
return nil, 0, "", fmt.Errorf("cachefor %v: %w", gvk, err)
}
if gvk.Group == "ext.cattle.io" && (gvk.Kind == "Token" || gvk.Kind == "Kubeconfig") {
accessSet := accesscontrol.AccessSetFromAPIRequest(apiOp)
@@ -814,7 +814,7 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, apiSchema *types.APISc
if errors.Is(err, informer.ErrInvalidColumn) {
return nil, 0, "", apierror.NewAPIError(validation.InvalidBodyContent, err.Error())
}
return nil, 0, "", err
return nil, 0, "", fmt.Errorf("listbyoptions %v: %w", gvk, err)
}
return list, total, continueToken, nil