1
0
mirror of https://github.com/rancher/steve.git synced 2025-09-10 11:49:15 +00:00

Fix race condition with database reset (#804)

* Wait for garbage collection to be finished before creating a new DB

* Prevent DB reset while inflight List requests

* Rename cacheFor to cacheForLocked

* Rename Reset to Stop

* Mention DoneWithCache in README

* Add RunGC to ByOptionsLister interface
This commit is contained in:
Tom Lebreux
2025-09-05 11:30:56 -04:00
committed by GitHub
parent 0adae191dd
commit 4bd760b22d
13 changed files with 311 additions and 79 deletions

View File

@@ -82,6 +82,10 @@ intended to be used as a way of enforcing RBAC.
if err != nil {
panic(err)
}
// Don't forget to let the cache factory know you're done with
// the cache. This unlocks a global lock used to prevent
// Stop()ing the cache when requests are in flight.
defer cacheFactory.DoneWithCache(c)
// continueToken will just be an offset that can be used in Resume on a subsequent request to continue
// to next page

View File

@@ -108,11 +108,20 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) {
// CacheFor returns an informer for given GVK, using sql store indexed with fields, using the specified client. For virtual fields, they must be added by the transform function
// and specified by fields to be used for later fields.
func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (Cache, error) {
//
// Don't forget to call DoneWithCache with the given informer once done with it.
func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) {
// First of all block Reset() until we are done
f.mutex.RLock()
defer f.mutex.RUnlock()
cache, err := f.cacheForLocked(ctx, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, namespaced, watchable)
if err != nil {
f.mutex.RUnlock()
return nil, err
}
return cache, nil
}
func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) {
// Second, check if the informer and its accompanying informer-specific mutex exist already in the informers cache
// If not, start by creating such informer-specific mutex. That is used later to ensure no two goroutines create
// informers for the same GVK at the same type
@@ -146,7 +155,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
shouldEncrypt := f.encryptAll || encryptResourceAlways
i, err := f.newInformer(f.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, f.gcInterval, f.gcKeepCount)
if err != nil {
return Cache{}, err
return nil, err
}
err = i.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
@@ -157,7 +166,7 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
cache.DefaultWatchErrorHandler(ctx, r, err)
})
if err != nil {
return Cache{}, err
return nil, err
}
f.wg.StartWithChannel(f.ctx.Done(), i.Run)
@@ -166,16 +175,25 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
}
if !cache.WaitForCacheSync(f.ctx.Done(), gi.informer.HasSynced) {
return Cache{}, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk)
return nil, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk)
}
// At this point the informer is ready, return it
return Cache{ByOptionsLister: gi.informer}, nil
return &Cache{ByOptionsLister: gi.informer}, nil
}
// Reset cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets
// DoneWithCache must be called for every CacheFor call.
//
// This ensures that there aren't any inflight list requests while we are resetting the database.
//
// TODO: Use the *Cache once we go per-GVK
func (f *CacheFactory) DoneWithCache(_ *Cache) {
f.mutex.RUnlock()
}
// Stop cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets
// the database connection which wipes any current sqlite database at the default location.
func (f *CacheFactory) Reset() error {
func (f *CacheFactory) Stop() error {
if f.dbClient == nil {
// nothing to reset
return nil
@@ -187,8 +205,8 @@ func (f *CacheFactory) Reset() error {
// now that we are alone, stop all informers created until this point
f.cancel()
f.ctx, f.cancel = context.WithCancel(context.Background())
f.wg.Wait()
f.ctx, f.cancel = context.WithCancel(context.Background())
// and get rid of all references to those informers and their mutexes
f.informersMutex.Lock()

View File

@@ -21,6 +21,7 @@ import (
//go:generate go run go.uber.org/mock/mockgen@latest --build_flags=--mod=mod -package factory -destination ./transaction_mocks_tests.go -mock_names Client=MockTXClient github.com/rancher/steve/pkg/sqlcache/db/transaction Client
//go:generate go run go.uber.org/mock/mockgen@latest --build_flags=--mod=mod -package factory -destination ./dynamic_mocks_test.go k8s.io/client-go/dynamic ResourceInterface
//go:generate go run go.uber.org/mock/mockgen@latest --build_flags=--mod=mod -package factory -destination ./k8s_cache_mocks_test.go k8s.io/client-go/tools/cache SharedIndexInformer
//go:generate go run go.uber.org/mock/mockgen@latest --build_flags=--mod=mod -package factory -destination ./sql_informer_mocks_test.go github.com/rancher/steve/pkg/sqlcache/informer ByOptionsLister
func TestNewCacheFactory(t *testing.T) {
type testCase struct {
@@ -64,6 +65,8 @@ func TestCacheFor(t *testing.T) {
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{}
bloi := NewMockByOptionsLister(gomock.NewController(t))
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true).AnyTimes()
sii.EXPECT().Run(gomock.Any()).MinTimes(1)
@@ -71,8 +74,9 @@ func TestCacheFor(t *testing.T) {
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
ByOptionsLister: bloi,
}
expectedC := Cache{
expectedC := &Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
@@ -97,7 +101,7 @@ func TestCacheFor(t *testing.T) {
time.Sleep(5 * time.Second)
f.cancel()
}()
var c Cache
var c *Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
@@ -114,6 +118,8 @@ func TestCacheFor(t *testing.T) {
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{}
bloi := NewMockByOptionsLister(gomock.NewController(t))
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(false).AnyTimes()
sii.EXPECT().Run(gomock.Any())
@@ -121,6 +127,7 @@ func TestCacheFor(t *testing.T) {
expectedI := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
ByOptionsLister: bloi,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
@@ -154,6 +161,8 @@ func TestCacheFor(t *testing.T) {
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{}
bloi := NewMockByOptionsLister(gomock.NewController(t))
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true).AnyTimes()
// may or may not call run initially
@@ -162,8 +171,9 @@ func TestCacheFor(t *testing.T) {
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
ByOptionsLister: bloi,
}
expectedC := Cache{
expectedC := &Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
@@ -184,7 +194,7 @@ func TestCacheFor(t *testing.T) {
f.ctx, f.cancel = context.WithCancel(context.Background())
f.cancel()
var c Cache
var c *Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
@@ -196,6 +206,8 @@ func TestCacheFor(t *testing.T) {
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{}
bloi := NewMockByOptionsLister(gomock.NewController(t))
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true)
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
@@ -203,8 +215,9 @@ func TestCacheFor(t *testing.T) {
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
ByOptionsLister: bloi,
}
expectedC := Cache{
expectedC := &Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
@@ -229,7 +242,7 @@ func TestCacheFor(t *testing.T) {
time.Sleep(10 * time.Second)
f.cancel()
}()
var c Cache
var c *Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
@@ -246,6 +259,8 @@ func TestCacheFor(t *testing.T) {
Version: "v1",
Kind: "Secret",
}
bloi := NewMockByOptionsLister(gomock.NewController(t))
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true)
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
@@ -253,8 +268,9 @@ func TestCacheFor(t *testing.T) {
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
ByOptionsLister: bloi,
}
expectedC := Cache{
expectedC := &Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
@@ -279,7 +295,7 @@ func TestCacheFor(t *testing.T) {
time.Sleep(10 * time.Second)
f.cancel()
}()
var c Cache
var c *Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
@@ -295,6 +311,8 @@ func TestCacheFor(t *testing.T) {
Version: "v3",
Kind: "Token",
}
bloi := NewMockByOptionsLister(gomock.NewController(t))
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true)
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
@@ -302,8 +320,9 @@ func TestCacheFor(t *testing.T) {
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
ByOptionsLister: bloi,
}
expectedC := Cache{
expectedC := &Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
@@ -328,7 +347,7 @@ func TestCacheFor(t *testing.T) {
time.Sleep(10 * time.Second)
f.cancel()
}()
var c Cache
var c *Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
@@ -341,6 +360,8 @@ func TestCacheFor(t *testing.T) {
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{}
bloi := NewMockByOptionsLister(gomock.NewController(t))
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true)
sii.EXPECT().Run(gomock.Any()).MinTimes(1)
@@ -351,8 +372,9 @@ func TestCacheFor(t *testing.T) {
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
ByOptionsLister: bloi,
}
expectedC := Cache{
expectedC := &Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
@@ -385,7 +407,7 @@ func TestCacheFor(t *testing.T) {
time.Sleep(5 * time.Second)
f.cancel()
}()
var c Cache
var c *Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, nil, transformFunc, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
@@ -397,6 +419,8 @@ func TestCacheFor(t *testing.T) {
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{}
bloi := NewMockByOptionsLister(gomock.NewController(t))
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true)
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
@@ -404,8 +428,9 @@ func TestCacheFor(t *testing.T) {
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
ByOptionsLister: bloi,
}
expectedC := Cache{
expectedC := &Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
@@ -433,7 +458,7 @@ func TestCacheFor(t *testing.T) {
time.Sleep(10 * time.Second)
f.cancel()
}()
var c Cache
var c *Cache
var err error
// CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool)
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)

View File

@@ -0,0 +1,103 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/rancher/steve/pkg/sqlcache/informer (interfaces: ByOptionsLister)
//
// Generated by this command:
//
// mockgen --build_flags=--mod=mod -package factory -destination ./sql_informer_mocks_test.go github.com/rancher/steve/pkg/sqlcache/informer ByOptionsLister
//
// Package factory is a generated GoMock package.
package factory
import (
context "context"
reflect "reflect"
informer "github.com/rancher/steve/pkg/sqlcache/informer"
partition "github.com/rancher/steve/pkg/sqlcache/partition"
sqltypes "github.com/rancher/steve/pkg/sqlcache/sqltypes"
gomock "go.uber.org/mock/gomock"
unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
watch "k8s.io/apimachinery/pkg/watch"
)
// MockByOptionsLister is a mock of ByOptionsLister interface.
type MockByOptionsLister struct {
ctrl *gomock.Controller
recorder *MockByOptionsListerMockRecorder
isgomock struct{}
}
// MockByOptionsListerMockRecorder is the mock recorder for MockByOptionsLister.
type MockByOptionsListerMockRecorder struct {
mock *MockByOptionsLister
}
// NewMockByOptionsLister creates a new mock instance.
func NewMockByOptionsLister(ctrl *gomock.Controller) *MockByOptionsLister {
mock := &MockByOptionsLister{ctrl: ctrl}
mock.recorder = &MockByOptionsListerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockByOptionsLister) EXPECT() *MockByOptionsListerMockRecorder {
return m.recorder
}
// GetLatestResourceVersion mocks base method.
func (m *MockByOptionsLister) GetLatestResourceVersion() []string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetLatestResourceVersion")
ret0, _ := ret[0].([]string)
return ret0
}
// GetLatestResourceVersion indicates an expected call of GetLatestResourceVersion.
func (mr *MockByOptionsListerMockRecorder) GetLatestResourceVersion() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestResourceVersion", reflect.TypeOf((*MockByOptionsLister)(nil).GetLatestResourceVersion))
}
// ListByOptions mocks base method.
func (m *MockByOptionsLister) ListByOptions(ctx context.Context, lo *sqltypes.ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ListByOptions", ctx, lo, partitions, namespace)
ret0, _ := ret[0].(*unstructured.UnstructuredList)
ret1, _ := ret[1].(int)
ret2, _ := ret[2].(string)
ret3, _ := ret[3].(error)
return ret0, ret1, ret2, ret3
}
// ListByOptions indicates an expected call of ListByOptions.
func (mr *MockByOptionsListerMockRecorder) ListByOptions(ctx, lo, partitions, namespace any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByOptions", reflect.TypeOf((*MockByOptionsLister)(nil).ListByOptions), ctx, lo, partitions, namespace)
}
// RunGC mocks base method.
func (m *MockByOptionsLister) RunGC(arg0 context.Context) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "RunGC", arg0)
}
// RunGC indicates an expected call of RunGC.
func (mr *MockByOptionsListerMockRecorder) RunGC(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunGC", reflect.TypeOf((*MockByOptionsLister)(nil).RunGC), arg0)
}
// Watch mocks base method.
func (m *MockByOptionsLister) Watch(ctx context.Context, options informer.WatchOptions, eventsCh chan<- watch.Event) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Watch", ctx, options, eventsCh)
ret0, _ := ret[0].(error)
return ret0
}
// Watch indicates an expected call of Watch.
func (mr *MockByOptionsListerMockRecorder) Watch(ctx, options, eventsCh any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockByOptionsLister)(nil).Watch), ctx, options, eventsCh)
}

View File

@@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
@@ -49,6 +50,7 @@ type ByOptionsLister interface {
ListByOptions(ctx context.Context, lo *sqltypes.ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error)
Watch(ctx context.Context, options WatchOptions, eventsCh chan<- watch.Event) error
GetLatestResourceVersion() []string
RunGC(context.Context)
}
// this is set to a var so that it can be overridden by test code for mocking purposes
@@ -138,6 +140,22 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [
}, nil
}
// Run implements [cache.SharedIndexInformer]
func (i *Informer) Run(stopCh <-chan struct{}) {
var wg wait.Group
wg.StartWithChannel(stopCh, i.SharedIndexInformer.Run)
wg.StartWithContext(wait.ContextForChannel(stopCh), i.ByOptionsLister.RunGC)
wg.Wait()
}
// RunWithContext implements [cache.SharedIndexInformer]
func (i *Informer) RunWithContext(ctx context.Context) {
var wg wait.Group
wg.StartWithContext(ctx, i.SharedIndexInformer.RunWithContext)
wg.StartWithContext(ctx, i.ByOptionsLister.RunGC)
wg.Wait()
}
// ListByOptions returns objects according to the specified list options and partitions.
// Specifically:
// - an unstructured list of resources belonging to any of the specified partitions

View File

@@ -75,6 +75,18 @@ func (mr *MockByOptionsListerMockRecorder) ListByOptions(ctx, lo, partitions, na
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByOptions", reflect.TypeOf((*MockByOptionsLister)(nil).ListByOptions), ctx, lo, partitions, namespace)
}
// RunGC mocks base method.
func (m *MockByOptionsLister) RunGC(arg0 context.Context) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "RunGC", arg0)
}
// RunGC indicates an expected call of RunGC.
func (mr *MockByOptionsListerMockRecorder) RunGC(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunGC", reflect.TypeOf((*MockByOptionsLister)(nil).RunGC), arg0)
}
// Watch mocks base method.
func (m *MockByOptionsLister) Watch(ctx context.Context, options WatchOptions, eventsCh chan<- watch.Event) error {
m.ctrl.T.Helper()

View File

@@ -44,6 +44,11 @@ type ListOptionIndexer struct {
watchersLock sync.RWMutex
watchers map[*watchKey]*watcher
// gcInterval is how often to run the garbage collection
gcInterval time.Duration
// gcKeepCount is how many events to keep in _events table when gc runs
gcKeepCount int
upsertEventsQuery string
findEventsRowByRVQuery string
listEventsAfterQuery string
@@ -282,7 +287,8 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp
l.deleteLabelsByKeyStmt = l.Prepare(l.deleteLabelsByKeyQuery)
l.deleteLabelsStmt = l.Prepare(l.deleteLabelsQuery)
go l.runGC(ctx, opts.GCInterval, opts.GCKeepCount)
l.gcInterval = opts.GCInterval
l.gcKeepCount = opts.GCKeepCount
return l, nil
}
@@ -1555,21 +1561,22 @@ func matchFilter(filterName string, filterNamespace string, filterSelector label
return true
}
func (l *ListOptionIndexer) runGC(ctx context.Context, interval time.Duration, keepCount int) {
if interval == 0 || keepCount == 0 {
func (l *ListOptionIndexer) RunGC(ctx context.Context) {
if l.gcInterval == 0 || l.gcKeepCount == 0 {
return
}
ticker := time.NewTicker(interval)
ticker := time.NewTicker(l.gcInterval)
defer ticker.Stop()
logrus.Infof("Started SQL cache garbage collection for %s (interval=%s, keep=%d)", l.GetName(), interval, keepCount)
logrus.Infof("Started SQL cache garbage collection for %s (interval=%s, keep=%d)", l.GetName(), l.gcInterval, l.gcKeepCount)
defer logrus.Infof("Stopped SQL cache garbage collection for %s (interval=%s, keep=%d)", l.GetName(), l.gcInterval, l.gcKeepCount)
for {
select {
case <-ticker.C:
err := l.WithTransaction(ctx, true, func(tx transaction.Client) error {
_, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(keepCount)
_, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(l.gcKeepCount)
if err != nil {
return &db.QueryError{QueryString: l.deleteEventsByCountQuery, Err: err}
}

View File

@@ -100,6 +100,8 @@ func makeListOptionIndexer(ctx context.Context, opts ListOptionIndexerOptions, s
return nil, "", err
}
go listOptionIndexer.RunGC(ctx)
return listOptionIndexer, dbPath, nil
}

View File

@@ -95,7 +95,10 @@ func (i *IntegrationSuite) TestSQLCacheFilters() {
cache, cacheFactory, err := i.createCacheAndFactory(fields, nil)
require.NoError(err)
defer cacheFactory.Reset()
defer func() {
cacheFactory.DoneWithCache(cache)
cacheFactory.Stop()
}()
// doesn't match the filter for somekey == somevalue
notMatches := configMapWithAnnotations("not-matches-filter", map[string]string{"somekey": "notequal"})
@@ -327,7 +330,7 @@ func (i *IntegrationSuite) createCacheAndFactory(fields [][]string, transformFun
if err != nil {
return nil, nil, fmt.Errorf("unable to make cache: %w", err)
}
return &cache, cacheFactory, nil
return cache, cacheFactory, nil
}
func (i *IntegrationSuite) waitForCacheReady(readyResourceNames []string, namespace string, cache *factory.Cache) error {