1
0
mirror of https://github.com/rancher/steve.git synced 2025-09-08 02:39:26 +00:00

Add garbage collection for events table (#664)

* Add ListOptionIndexerOptions to create ListOptionIndexer

* Remove unused function

* Implement configurable garbage collection on ListOptionIndexer

* Propagate Steve options from Server
This commit is contained in:
Tom Lebreux
2025-06-10 10:02:55 -06:00
committed by GitHub
parent 39fed09b2e
commit f258ebcf31
10 changed files with 469 additions and 116 deletions

View File

@@ -33,6 +33,9 @@ type CacheFactory struct {
mutex sync.RWMutex
encryptAll bool
defaultMaximumEventsCount int
perGVKMaximumEventsCount map[schema.GroupVersionKind]int
newInformer newInformer
informers map[schema.GroupVersionKind]*guardedInformer
@@ -44,7 +47,7 @@ type guardedInformer struct {
mutex *sync.Mutex
}
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool) (*informer.Informer, error)
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, maxEventsCount int) (*informer.Informer, error)
type Cache struct {
informer.ByOptionsLister
@@ -62,9 +65,25 @@ var defaultEncryptedResourceTypes = map[schema.GroupVersionKind]struct{}{
}: {},
}
type CacheFactoryOptions struct {
// DefaultMaximumEventsCount is the maximum number of events to keep in
// the events table by default.
//
// Use PerGVKMaximumEventsCount if you want to set a different value for
// a specific GVK.
//
// A value of 0 means no limits.
DefaultMaximumEventsCount int
// PerGVKMaximumEventsCount is the maximum number of events to keep in
// the events table for specific GVKs.
//
// A value of 0 means no limits.
PerGVKMaximumEventsCount map[schema.GroupVersionKind]int
}
// NewCacheFactory returns an informer factory instance
// This is currently called from steve via initial calls to `s.cacheFactory.CacheFor(...)`
func NewCacheFactory() (*CacheFactory, error) {
func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) {
m, err := encryption.NewManager()
if err != nil {
return nil, err
@@ -74,10 +93,14 @@ func NewCacheFactory() (*CacheFactory, error) {
return nil, err
}
return &CacheFactory{
wg: wait.Group{},
stopCh: make(chan struct{}),
encryptAll: os.Getenv(EncryptAllEnvVar) == "true",
dbClient: dbClient,
wg: wait.Group{},
stopCh: make(chan struct{}),
encryptAll: os.Getenv(EncryptAllEnvVar) == "true",
dbClient: dbClient,
defaultMaximumEventsCount: opts.DefaultMaximumEventsCount,
perGVKMaximumEventsCount: opts.PerGVKMaximumEventsCount,
newInformer: informer.NewInformer,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}, nil
@@ -121,7 +144,8 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transfor
_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
shouldEncrypt := f.encryptAll || encryptResourceAlways
i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable)
maxEventsCount := f.getMaximumEventsCount(gvk)
i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, maxEventsCount)
if err != nil {
return Cache{}, err
}
@@ -150,6 +174,13 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transfor
return Cache{ByOptionsLister: gi.informer}, nil
}
func (f *CacheFactory) getMaximumEventsCount(gvk schema.GroupVersionKind) int {
if maxCount, ok := f.perGVKMaximumEventsCount[gvk]; ok {
return maxCount
}
return f.defaultMaximumEventsCount
}
// Reset closes the stopCh which stops any running informers, assigns a new stopCh, resets the GVK-informer cache, and resets
// the database connection which wipes any current sqlite database at the default location.
func (f *CacheFactory) Reset() error {

View File

@@ -30,7 +30,7 @@ func TestNewCacheFactory(t *testing.T) {
var tests []testCase
tests = append(tests, testCase{description: "NewCacheFactory() with no errors returned, should return no errors", test: func(t *testing.T) {
f, err := NewCacheFactory()
f, err := NewCacheFactory(CacheFactoryOptions{})
assert.Nil(t, err)
assert.NotNil(t, f.dbClient)
assert.False(t, f.encryptAll)
@@ -38,7 +38,7 @@ func TestNewCacheFactory(t *testing.T) {
tests = append(tests, testCase{description: "NewCacheFactory() with no errors returned and EncryptAllEnvVar set to true, should return no errors and have encryptAll set to true", test: func(t *testing.T) {
err := os.Setenv(EncryptAllEnvVar, "true")
assert.Nil(t, err)
f, err := NewCacheFactory()
f, err := NewCacheFactory(CacheFactoryOptions{})
assert.Nil(t, err)
assert.Nil(t, err)
assert.NotNil(t, f.dbClient)
@@ -74,12 +74,13 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, false, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
return i, nil
}
f := &CacheFactory{
@@ -119,12 +120,13 @@ func TestCacheFor(t *testing.T) {
// need to set this so Run function is not nil
SharedIndexInformer: sii,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, false, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
return expectedI, nil
}
f := &CacheFactory{
@@ -161,12 +163,13 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, false, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
return i, nil
}
f := &CacheFactory{
@@ -200,12 +203,13 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
return i, nil
}
f := &CacheFactory{
@@ -248,12 +252,13 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
return i, nil
}
f := &CacheFactory{
@@ -295,12 +300,13 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
return i, nil
}
f := &CacheFactory{
@@ -342,7 +348,7 @@ func TestCacheFor(t *testing.T) {
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) {
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
// we can't test func == func, so instead we check if the output was as expected
input := "someinput"
ouput, err := transform(input)
@@ -356,6 +362,7 @@ func TestCacheFor(t *testing.T) {
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, false, shouldEncrypt)
assert.Equal(t, 0, maxEventsCount)
return i, nil
}
f := &CacheFactory{
@@ -377,6 +384,103 @@ func TestCacheFor(t *testing.T) {
assert.Equal(t, expectedC, c)
time.Sleep(1 * time.Second)
}})
tests = append(tests, testCase{description: "CacheFor() with default max events count", test: func(t *testing.T) {
dbClient := NewMockClient(gomock.NewController(t))
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{}
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true)
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
sii.EXPECT().SetWatchErrorHandler(gomock.Any())
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
}
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 10, maxEventsCount)
return i, nil
}
f := &CacheFactory{
defaultMaximumEventsCount: 10,
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
encryptAll: true,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
go func() {
time.Sleep(10 * time.Second)
close(f.stopCh)
}()
var c Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
assert.Equal(t, expectedC, c)
time.Sleep(1 * time.Second)
}})
tests = append(tests, testCase{description: "CacheFor() with per GVK maximum events count", test: func(t *testing.T) {
dbClient := NewMockClient(gomock.NewController(t))
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
fields := [][]string{{"something"}}
expectedGVK := schema.GroupVersionKind{
Group: "management.cattle.io",
Version: "v3",
Kind: "Token",
}
sii := NewMockSharedIndexInformer(gomock.NewController(t))
sii.EXPECT().HasSynced().Return(true)
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
sii.EXPECT().SetWatchErrorHandler(gomock.Any())
i := &informer.Informer{
// need to set this so Run function is not nil
SharedIndexInformer: sii,
}
expectedC := Cache{
ByOptionsLister: i,
}
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) {
assert.Equal(t, client, dynamicClient)
assert.Equal(t, fields, fields)
assert.Equal(t, expectedGVK, gvk)
assert.Equal(t, db, dbClient)
assert.Equal(t, true, shouldEncrypt)
assert.Equal(t, 10, maxEventsCount)
return i, nil
}
f := &CacheFactory{
defaultMaximumEventsCount: 5,
perGVKMaximumEventsCount: map[schema.GroupVersionKind]int{
expectedGVK: 10,
},
dbClient: dbClient,
stopCh: make(chan struct{}),
newInformer: testNewInformer,
encryptAll: true,
informers: map[schema.GroupVersionKind]*guardedInformer{},
}
go func() {
time.Sleep(10 * time.Second)
close(f.stopCh)
}()
var c Cache
var err error
c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true)
assert.Nil(t, err)
assert.Equal(t, expectedC, c)
time.Sleep(1 * time.Second)
}})
t.Parallel()
for _, test := range tests {
t.Run(test.description, func(t *testing.T) { test.test(t) })

View File

@@ -52,7 +52,7 @@ var newInformer = cache.NewSharedIndexInformer
// NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form
// using the specified client
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*Informer, error) {
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*Informer, error) {
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
return client.Watch(ctx, options)
}
@@ -101,7 +101,13 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [
if err != nil {
return nil, err
}
loi, err := NewListOptionIndexer(ctx, fields, s, namespaced)
opts := ListOptionIndexerOptions{
Fields: fields,
IsNamespaced: namespaced,
MaximumEventsCount: maxEventsCount,
}
loi, err := NewListOptionIndexer(ctx, s, opts)
if err != nil {
return nil, err
}

View File

@@ -81,7 +81,7 @@ func TestNewInformer(t *testing.T) {
}
})
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0)
assert.Nil(t, err)
assert.NotNil(t, informer.ByOptionsLister)
assert.NotNil(t, informer.SharedIndexInformer)
@@ -105,7 +105,7 @@ func TestNewInformer(t *testing.T) {
}
})
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) {
@@ -140,7 +140,7 @@ func TestNewInformer(t *testing.T) {
}
})
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) {
@@ -193,7 +193,7 @@ func TestNewInformer(t *testing.T) {
}
})
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true)
_, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) {
@@ -257,7 +257,7 @@ func TestNewInformer(t *testing.T) {
transformFunc := func(input interface{}) (interface{}, error) {
return "someoutput", nil
}
informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true)
informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true, 0)
assert.Nil(t, err)
assert.NotNil(t, informer.ByOptionsLister)
assert.NotNil(t, informer.SharedIndexInformer)
@@ -293,7 +293,7 @@ func TestNewInformer(t *testing.T) {
transformFunc := func(input interface{}) (interface{}, error) {
return "someoutput", nil
}
_, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true)
_, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true, 0)
assert.Error(t, err)
newInformer = cache.NewSharedIndexInformer
}})

View File

@@ -35,31 +35,36 @@ type ListOptionIndexer struct {
namespaced bool
indexedFields []string
// maximumEventsCount is how many events to keep. 0 means keep all events.
maximumEventsCount int
latestRVLock sync.RWMutex
latestRV string
watchersLock sync.RWMutex
watchers map[*watchKey]*watcher
upsertEventsQuery string
findEventsRowByRVQuery string
listEventsAfterQuery string
addFieldsQuery string
deleteFieldsByKeyQuery string
deleteFieldsQuery string
upsertLabelsQuery string
deleteLabelsByKeyQuery string
deleteLabelsQuery string
upsertEventsQuery string
findEventsRowByRVQuery string
listEventsAfterQuery string
deleteEventsByCountQuery string
addFieldsQuery string
deleteFieldsByKeyQuery string
deleteFieldsQuery string
upsertLabelsQuery string
deleteLabelsByKeyQuery string
deleteLabelsQuery string
upsertEventsStmt *sql.Stmt
findEventsRowByRVStmt *sql.Stmt
listEventsAfterStmt *sql.Stmt
addFieldsStmt *sql.Stmt
deleteFieldsByKeyStmt *sql.Stmt
deleteFieldsStmt *sql.Stmt
upsertLabelsStmt *sql.Stmt
deleteLabelsByKeyStmt *sql.Stmt
deleteLabelsStmt *sql.Stmt
upsertEventsStmt *sql.Stmt
findEventsRowByRVStmt *sql.Stmt
listEventsAfterStmt *sql.Stmt
deleteEventsByCountStmt *sql.Stmt
addFieldsStmt *sql.Stmt
deleteFieldsByKeyStmt *sql.Stmt
deleteFieldsStmt *sql.Stmt
upsertLabelsStmt *sql.Stmt
deleteLabelsByKeyStmt *sql.Stmt
deleteLabelsStmt *sql.Stmt
}
var (
@@ -91,6 +96,11 @@ const (
FROM "%s_events"
WHERE rv = ?
`
deleteEventsByCountFmt = `DELETE FROM "%s_events"
WHERE rowid
NOT IN (
SELECT rowid FROM "%s_events" ORDER BY rowid DESC LIMIT ?
)`
createFieldsTableFmt = `CREATE TABLE "%s_fields" (
key TEXT NOT NULL PRIMARY KEY,
@@ -114,10 +124,25 @@ const (
deleteLabelsStmtFmt = `DELETE FROM "%s_labels"`
)
type ListOptionIndexerOptions struct {
// Fields is a list of fields within the object that we want indexed for
// filtering & sorting. Each field is specified as a slice.
//
// For example, .metadata.resourceVersion should be specified as []string{"metadata", "resourceVersion"}
Fields [][]string
// IsNamespaced determines whether the GVK for this ListOptionIndexer is
// namespaced
IsNamespaced bool
// MaximumEventsCount is the maximum number of events we want to keep
// in the _events table.
//
// Zero means never delete events.
MaximumEventsCount int
}
// NewListOptionIndexer returns a SQLite-backed cache.Indexer of unstructured.Unstructured Kubernetes resources of a certain GVK
// ListOptionIndexer is also able to satisfy ListOption queries on indexed (sub)fields.
// Fields are specified as slices (e.g. "metadata.resourceVersion" is ["metadata", "resourceVersion"])
func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, namespaced bool) (*ListOptionIndexer, error) {
func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOptions) (*ListOptionIndexer, error) {
// necessary in order to gob/ungob unstructured.Unstructured objects
gob.Register(map[string]interface{}{})
gob.Register([]interface{}{})
@@ -131,28 +156,32 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names
for _, f := range defaultIndexedFields {
indexedFields = append(indexedFields, f)
}
if namespaced {
if opts.IsNamespaced {
indexedFields = append(indexedFields, defaultIndexNamespaced)
}
for _, f := range fields {
for _, f := range opts.Fields {
indexedFields = append(indexedFields, toColumnName(f))
}
l := &ListOptionIndexer{
Indexer: i,
namespaced: namespaced,
indexedFields: indexedFields,
watchers: make(map[*watchKey]*watcher),
Indexer: i,
namespaced: opts.IsNamespaced,
indexedFields: indexedFields,
maximumEventsCount: opts.MaximumEventsCount,
watchers: make(map[*watchKey]*watcher),
}
l.RegisterAfterAdd(l.addIndexFields)
l.RegisterAfterAdd(l.addLabels)
l.RegisterAfterAdd(l.notifyEventAdded)
l.RegisterAfterAdd(l.deleteOldEvents)
l.RegisterAfterUpdate(l.addIndexFields)
l.RegisterAfterUpdate(l.addLabels)
l.RegisterAfterUpdate(l.notifyEventModified)
l.RegisterAfterUpdate(l.deleteOldEvents)
l.RegisterAfterDelete(l.deleteFieldsByKey)
l.RegisterAfterDelete(l.deleteLabelsByKey)
l.RegisterAfterDelete(l.notifyEventDeleted)
l.RegisterAfterDelete(l.deleteOldEvents)
l.RegisterAfterDeleteAll(l.deleteFields)
l.RegisterAfterDeleteAll(l.deleteLabels)
columnDefs := make([]string, len(indexedFields))
@@ -226,6 +255,9 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names
l.findEventsRowByRVQuery = fmt.Sprintf(findEventsRowByRVFmt, dbName)
l.findEventsRowByRVStmt = l.Prepare(l.findEventsRowByRVQuery)
l.deleteEventsByCountQuery = fmt.Sprintf(deleteEventsByCountFmt, dbName, dbName)
l.deleteEventsByCountStmt = l.Prepare(l.deleteEventsByCountQuery)
l.addFieldsQuery = fmt.Sprintf(
`INSERT INTO "%s_fields"(key, %s) VALUES (?, %s) ON CONFLICT DO UPDATE SET %s`,
dbName,
@@ -450,6 +482,18 @@ func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, o
return nil
}
func (l *ListOptionIndexer) deleteOldEvents(key string, obj any, tx transaction.Client) error {
if l.maximumEventsCount == 0 {
return nil
}
_, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(l.maximumEventsCount)
if err != nil {
return &db.QueryError{QueryString: l.deleteEventsByCountQuery, Err: err}
}
return nil
}
// addIndexFields saves sortable/filterable fields into tables
func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx transaction.Client) error {
args := []any{key}

View File

@@ -30,7 +30,7 @@ import (
"k8s.io/client-go/tools/cache"
)
func makeListOptionIndexer(ctx context.Context, fields [][]string) (*ListOptionIndexer, error) {
func makeListOptionIndexer(ctx context.Context, opts ListOptionIndexerOptions) (*ListOptionIndexer, error) {
gvk := schema.GroupVersionKind{
Group: "",
Version: "v1",
@@ -54,7 +54,7 @@ func makeListOptionIndexer(ctx context.Context, fields [][]string) (*ListOptionI
return nil, err
}
listOptionIndexer, err := NewListOptionIndexer(ctx, fields, s, true)
listOptionIndexer, err := NewListOptionIndexer(ctx, s, opts)
if err != nil {
return nil, err
}
@@ -91,9 +91,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
// create events table
@@ -115,7 +115,11 @@ func TestNewListOptionIndexer(t *testing.T) {
}
})
loi, err := NewListOptionIndexer(context.Background(), fields, store, true)
opts := ListOptionIndexerOptions{
Fields: fields,
IsNamespaced: true,
}
loi, err := NewListOptionIndexer(context.Background(), store, opts)
assert.Nil(t, err)
assert.NotNil(t, loi)
}})
@@ -136,7 +140,10 @@ func TestNewListOptionIndexer(t *testing.T) {
}
})
_, err := NewListOptionIndexer(context.Background(), fields, store, false)
opts := ListOptionIndexerOptions{
Fields: fields,
}
_, err := NewListOptionIndexer(context.Background(), store, opts)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewListOptionIndexer() with error returned from Begin(), should return an error", test: func(t *testing.T) {
@@ -161,14 +168,17 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
store.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(fmt.Errorf("error"))
_, err := NewListOptionIndexer(context.Background(), fields, store, false)
opts := ListOptionIndexerOptions{
Fields: fields,
}
_, err := NewListOptionIndexer(context.Background(), store, opts)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewListOptionIndexer() with error from Exec() when creating fields table, should return an error", test: func(t *testing.T) {
@@ -193,9 +203,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
@@ -209,7 +219,11 @@ func TestNewListOptionIndexer(t *testing.T) {
}
})
_, err := NewListOptionIndexer(context.Background(), fields, store, true)
opts := ListOptionIndexerOptions{
Fields: fields,
IsNamespaced: true,
}
_, err := NewListOptionIndexer(context.Background(), store, opts)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewListOptionIndexer() with error from create-labels, should return an error", test: func(t *testing.T) {
@@ -234,9 +248,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
@@ -254,7 +268,11 @@ func TestNewListOptionIndexer(t *testing.T) {
}
})
_, err := NewListOptionIndexer(context.Background(), fields, store, true)
opts := ListOptionIndexerOptions{
Fields: fields,
IsNamespaced: true,
}
_, err := NewListOptionIndexer(context.Background(), store, opts)
assert.NotNil(t, err)
}})
tests = append(tests, testCase{description: "NewListOptionIndexer() with error from Commit(), should return an error", test: func(t *testing.T) {
@@ -279,9 +297,9 @@ func TestNewListOptionIndexer(t *testing.T) {
store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes()
// end NewIndexer() logic
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4)
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
@@ -300,7 +318,11 @@ func TestNewListOptionIndexer(t *testing.T) {
}
})
_, err := NewListOptionIndexer(context.Background(), fields, store, true)
opts := ListOptionIndexerOptions{
Fields: fields,
IsNamespaced: true,
}
_, err := NewListOptionIndexer(context.Background(), store, opts)
assert.NotNil(t, err)
}})
@@ -894,7 +916,11 @@ func TestNewListOptionIndexerEasy(t *testing.T) {
}
fields = append(fields, test.extraIndexedFields...)
loi, err := makeListOptionIndexer(ctx, fields)
opts := ListOptionIndexerOptions{
Fields: fields,
IsNamespaced: true,
}
loi, err := makeListOptionIndexer(ctx, opts)
assert.NoError(t, err)
for _, item := range itemList.Items {
@@ -1828,7 +1854,11 @@ func TestGetField(t *testing.T) {
func TestWatchMany(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
loi, err := makeListOptionIndexer(ctx, [][]string{{"metadata", "somefield"}})
opts := ListOptionIndexerOptions{
Fields: [][]string{{"metadata", "somefield"}},
IsNamespaced: true,
}
loi, err := makeListOptionIndexer(ctx, opts)
assert.NoError(t, err)
startWatcher := func(ctx context.Context) (chan watch.Event, chan error) {
@@ -2076,7 +2106,11 @@ func TestWatchFilter(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
loi, err := makeListOptionIndexer(ctx, [][]string{{"metadata", "somefield"}})
opts := ListOptionIndexerOptions{
Fields: [][]string{{"metadata", "somefield"}},
IsNamespaced: true,
}
loi, err := makeListOptionIndexer(ctx, opts)
assert.NoError(t, err)
wCh, errCh := startWatcher(ctx, loi, WatchFilter{
@@ -2166,7 +2200,10 @@ func TestWatchResourceVersion(t *testing.T) {
parentCtx := context.Background()
loi, err := makeListOptionIndexer(parentCtx, [][]string{})
opts := ListOptionIndexerOptions{
IsNamespaced: true,
}
loi, err := makeListOptionIndexer(parentCtx, opts)
assert.NoError(t, err)
getRV := func(t *testing.T) string {
@@ -2261,3 +2298,154 @@ func TestWatchResourceVersion(t *testing.T) {
})
}
}
func TestWatchGarbageCollection(t *testing.T) {
startWatcher := func(ctx context.Context, loi *ListOptionIndexer, rv string) (chan watch.Event, chan error) {
errCh := make(chan error, 1)
eventsCh := make(chan watch.Event, 100)
go func() {
watchErr := loi.Watch(ctx, WatchOptions{ResourceVersion: rv}, eventsCh)
errCh <- watchErr
}()
time.Sleep(100 * time.Millisecond)
return eventsCh, errCh
}
waitStopWatcher := func(errCh chan error) error {
select {
case <-time.After(time.Second * 5):
return fmt.Errorf("not finished in time")
case err := <-errCh:
return err
}
}
receiveEvents := func(eventsCh chan watch.Event) []watch.Event {
timer := time.NewTimer(time.Millisecond * 50)
var events []watch.Event
for {
select {
case <-timer.C:
return events
case ev := <-eventsCh:
events = append(events, ev)
}
}
}
foo := &unstructured.Unstructured{}
foo.SetResourceVersion("100")
foo.SetName("foo")
fooUpdated := foo.DeepCopy()
fooUpdated.SetResourceVersion("120")
bar := &unstructured.Unstructured{}
bar.SetResourceVersion("150")
bar.SetName("bar")
barNew := &unstructured.Unstructured{}
barNew.SetResourceVersion("160")
barNew.SetName("bar")
parentCtx := context.Background()
opts := ListOptionIndexerOptions{
MaximumEventsCount: 2,
}
loi, err := makeListOptionIndexer(parentCtx, opts)
assert.NoError(t, err)
getRV := func(t *testing.T) string {
t.Helper()
list, _, _, err := loi.ListByOptions(parentCtx, &sqltypes.ListOptions{}, []partition.Partition{{All: true}}, "")
assert.NoError(t, err)
return list.GetResourceVersion()
}
err = loi.Add(foo)
assert.NoError(t, err)
rv1 := getRV(t)
err = loi.Update(fooUpdated)
assert.NoError(t, err)
rv2 := getRV(t)
err = loi.Add(bar)
assert.NoError(t, err)
rv3 := getRV(t)
err = loi.Delete(bar)
assert.NoError(t, err)
rv4 := getRV(t)
for _, rv := range []string{rv1, rv2} {
watcherCh, errCh := startWatcher(parentCtx, loi, rv)
gotEvents := receiveEvents(watcherCh)
err = waitStopWatcher(errCh)
assert.Empty(t, gotEvents)
assert.ErrorIs(t, err, ErrTooOld)
}
tests := []struct {
rv string
expectedEvents []watch.Event
}{
{
rv: rv3,
expectedEvents: []watch.Event{
{Type: watch.Deleted, Object: bar},
},
},
{
rv: rv4,
expectedEvents: nil,
},
}
for _, test := range tests {
ctx, cancel := context.WithCancel(parentCtx)
watcherCh, errCh := startWatcher(ctx, loi, test.rv)
gotEvents := receiveEvents(watcherCh)
cancel()
err = waitStopWatcher(errCh)
assert.Equal(t, test.expectedEvents, gotEvents)
assert.NoError(t, err)
}
err = loi.Add(barNew)
assert.NoError(t, err)
rv5 := getRV(t)
for _, rv := range []string{rv1, rv2, rv3} {
watcherCh, errCh := startWatcher(parentCtx, loi, rv)
gotEvents := receiveEvents(watcherCh)
err = waitStopWatcher(errCh)
assert.Empty(t, gotEvents)
assert.ErrorIs(t, err, ErrTooOld)
}
tests = []struct {
rv string
expectedEvents []watch.Event
}{
{
rv: rv4,
expectedEvents: []watch.Event{
{Type: watch.Added, Object: barNew},
},
},
{
rv: rv5,
expectedEvents: nil,
},
}
for _, test := range tests {
ctx, cancel := context.WithCancel(parentCtx)
watcherCh, errCh := startWatcher(ctx, loi, test.rv)
gotEvents := receiveEvents(watcherCh)
cancel()
err = waitStopWatcher(errCh)
assert.Equal(t, test.expectedEvents, gotEvents)
assert.NoError(t, err)
}
}

View File

@@ -304,7 +304,7 @@ func (i *IntegrationSuite) TestSQLCacheFilters() {
}
func (i *IntegrationSuite) createCacheAndFactory(fields [][]string, transformFunc cache.TransformFunc) (*factory.Cache, *factory.CacheFactory, error) {
cacheFactory, err := factory.NewCacheFactory()
cacheFactory, err := factory.NewCacheFactory(factory.CacheFactoryOptions{})
if err != nil {
return nil, nil, fmt.Errorf("unable to make factory: %w", err)
}