diff --git a/pkg/server/server.go b/pkg/server/server.go index a2ffb1d5..014bbb1f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "errors" + "fmt" "net/http" apiserver "github.com/rancher/apiserver/pkg/server" @@ -22,6 +23,7 @@ import ( "github.com/rancher/steve/pkg/schema/definitions" "github.com/rancher/steve/pkg/server/handler" "github.com/rancher/steve/pkg/server/router" + "github.com/rancher/steve/pkg/sqlcache/informer/factory" metricsStore "github.com/rancher/steve/pkg/stores/metrics" "github.com/rancher/steve/pkg/stores/proxy" "github.com/rancher/steve/pkg/stores/sqlpartition" @@ -57,6 +59,8 @@ type Server struct { ClusterRegistry string Version string + cacheFactory *factory.CacheFactory + extensionAPIServer ExtensionAPIServer authMiddleware auth.Middleware @@ -85,6 +89,8 @@ type Options struct { // SQLCache enables the SQLite-based caching mechanism SQLCache bool + SQLCacheFactoryOptions factory.CacheFactoryOptions + // ExtensionAPIServer enables an extension API server that will be served // under /ext // If nil, Steve's default http handler for unknown routes will be served. @@ -99,6 +105,15 @@ func New(ctx context.Context, restConfig *rest.Config, opts *Options) (*Server, opts = &Options{} } + var cacheFactory *factory.CacheFactory + if opts.SQLCache { + var err error + cacheFactory, err = factory.NewCacheFactory(opts.SQLCacheFactoryOptions) + if err != nil { + return nil, fmt.Errorf("creating SQL cache factory: %w", err) + } + } + server := &Server{ RESTConfig: restConfig, ClientFactory: opts.ClientFactory, @@ -113,6 +128,7 @@ func New(ctx context.Context, restConfig *rest.Config, opts *Options) (*Server, Version: opts.ServerVersion, // SQLCache enables the SQLite-based lasso caching mechanism SQLCache: opts.SQLCache, + cacheFactory: cacheFactory, extensionAPIServer: opts.ExtensionAPIServer, } @@ -187,7 +203,7 @@ func setup(ctx context.Context, server *Server) error { var onSchemasHandler schemacontroller.SchemasHandlerFunc if server.SQLCache { - s, err := sqlproxy.NewProxyStore(ctx, cols, cf, summaryCache, summaryCache, nil) + s, err := sqlproxy.NewProxyStore(ctx, cols, cf, summaryCache, summaryCache, server.cacheFactory) if err != nil { panic(err) } diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index 0f1fb6ad..18b2a962 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -33,6 +33,9 @@ type CacheFactory struct { mutex sync.RWMutex encryptAll bool + defaultMaximumEventsCount int + perGVKMaximumEventsCount map[schema.GroupVersionKind]int + newInformer newInformer informers map[schema.GroupVersionKind]*guardedInformer @@ -44,7 +47,7 @@ type guardedInformer struct { mutex *sync.Mutex } -type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool) (*informer.Informer, error) +type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, maxEventsCount int) (*informer.Informer, error) type Cache struct { informer.ByOptionsLister @@ -62,9 +65,25 @@ var defaultEncryptedResourceTypes = map[schema.GroupVersionKind]struct{}{ }: {}, } +type CacheFactoryOptions struct { + // DefaultMaximumEventsCount is the maximum number of events to keep in + // the events table by default. + // + // Use PerGVKMaximumEventsCount if you want to set a different value for + // a specific GVK. + // + // A value of 0 means no limits. + DefaultMaximumEventsCount int + // PerGVKMaximumEventsCount is the maximum number of events to keep in + // the events table for specific GVKs. + // + // A value of 0 means no limits. + PerGVKMaximumEventsCount map[schema.GroupVersionKind]int +} + // NewCacheFactory returns an informer factory instance // This is currently called from steve via initial calls to `s.cacheFactory.CacheFor(...)` -func NewCacheFactory() (*CacheFactory, error) { +func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) { m, err := encryption.NewManager() if err != nil { return nil, err @@ -74,10 +93,14 @@ func NewCacheFactory() (*CacheFactory, error) { return nil, err } return &CacheFactory{ - wg: wait.Group{}, - stopCh: make(chan struct{}), - encryptAll: os.Getenv(EncryptAllEnvVar) == "true", - dbClient: dbClient, + wg: wait.Group{}, + stopCh: make(chan struct{}), + encryptAll: os.Getenv(EncryptAllEnvVar) == "true", + dbClient: dbClient, + + defaultMaximumEventsCount: opts.DefaultMaximumEventsCount, + perGVKMaximumEventsCount: opts.PerGVKMaximumEventsCount, + newInformer: informer.NewInformer, informers: map[schema.GroupVersionKind]*guardedInformer{}, }, nil @@ -121,7 +144,8 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transfor _, encryptResourceAlways := defaultEncryptedResourceTypes[gvk] shouldEncrypt := f.encryptAll || encryptResourceAlways - i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable) + maxEventsCount := f.getMaximumEventsCount(gvk) + i, err := f.newInformer(ctx, client, fields, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, maxEventsCount) if err != nil { return Cache{}, err } @@ -150,6 +174,13 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, transfor return Cache{ByOptionsLister: gi.informer}, nil } +func (f *CacheFactory) getMaximumEventsCount(gvk schema.GroupVersionKind) int { + if maxCount, ok := f.perGVKMaximumEventsCount[gvk]; ok { + return maxCount + } + return f.defaultMaximumEventsCount +} + // Reset closes the stopCh which stops any running informers, assigns a new stopCh, resets the GVK-informer cache, and resets // the database connection which wipes any current sqlite database at the default location. func (f *CacheFactory) Reset() error { diff --git a/pkg/sqlcache/informer/factory/informer_factory_test.go b/pkg/sqlcache/informer/factory/informer_factory_test.go index 8dee2da4..f63aec15 100644 --- a/pkg/sqlcache/informer/factory/informer_factory_test.go +++ b/pkg/sqlcache/informer/factory/informer_factory_test.go @@ -30,7 +30,7 @@ func TestNewCacheFactory(t *testing.T) { var tests []testCase tests = append(tests, testCase{description: "NewCacheFactory() with no errors returned, should return no errors", test: func(t *testing.T) { - f, err := NewCacheFactory() + f, err := NewCacheFactory(CacheFactoryOptions{}) assert.Nil(t, err) assert.NotNil(t, f.dbClient) assert.False(t, f.encryptAll) @@ -38,7 +38,7 @@ func TestNewCacheFactory(t *testing.T) { tests = append(tests, testCase{description: "NewCacheFactory() with no errors returned and EncryptAllEnvVar set to true, should return no errors and have encryptAll set to true", test: func(t *testing.T) { err := os.Setenv(EncryptAllEnvVar, "true") assert.Nil(t, err) - f, err := NewCacheFactory() + f, err := NewCacheFactory(CacheFactoryOptions{}) assert.Nil(t, err) assert.Nil(t, err) assert.NotNil(t, f.dbClient) @@ -74,12 +74,13 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) + assert.Equal(t, 0, maxEventsCount) return i, nil } f := &CacheFactory{ @@ -119,12 +120,13 @@ func TestCacheFor(t *testing.T) { // need to set this so Run function is not nil SharedIndexInformer: sii, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) + assert.Equal(t, 0, maxEventsCount) return expectedI, nil } f := &CacheFactory{ @@ -161,12 +163,13 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) + assert.Equal(t, 0, maxEventsCount) return i, nil } f := &CacheFactory{ @@ -200,12 +203,13 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) + assert.Equal(t, 0, maxEventsCount) return i, nil } f := &CacheFactory{ @@ -248,12 +252,13 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) + assert.Equal(t, 0, maxEventsCount) return i, nil } f := &CacheFactory{ @@ -295,12 +300,13 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced, watchable bool, maxEventsCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) assert.Equal(t, fields, fields) assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, true, shouldEncrypt) + assert.Equal(t, 0, maxEventsCount) return i, nil } f := &CacheFactory{ @@ -342,7 +348,7 @@ func TestCacheFor(t *testing.T) { expectedC := Cache{ ByOptionsLister: i, } - testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*informer.Informer, error) { + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { // we can't test func == func, so instead we check if the output was as expected input := "someinput" ouput, err := transform(input) @@ -356,6 +362,7 @@ func TestCacheFor(t *testing.T) { assert.Equal(t, expectedGVK, gvk) assert.Equal(t, db, dbClient) assert.Equal(t, false, shouldEncrypt) + assert.Equal(t, 0, maxEventsCount) return i, nil } f := &CacheFactory{ @@ -377,6 +384,103 @@ func TestCacheFor(t *testing.T) { assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) }}) + tests = append(tests, testCase{description: "CacheFor() with default max events count", test: func(t *testing.T) { + dbClient := NewMockClient(gomock.NewController(t)) + dynamicClient := NewMockResourceInterface(gomock.NewController(t)) + fields := [][]string{{"something"}} + expectedGVK := schema.GroupVersionKind{} + sii := NewMockSharedIndexInformer(gomock.NewController(t)) + sii.EXPECT().HasSynced().Return(true) + sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() + sii.EXPECT().SetWatchErrorHandler(gomock.Any()) + i := &informer.Informer{ + // need to set this so Run function is not nil + SharedIndexInformer: sii, + } + expectedC := Cache{ + ByOptionsLister: i, + } + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + assert.Equal(t, client, dynamicClient) + assert.Equal(t, fields, fields) + assert.Equal(t, expectedGVK, gvk) + assert.Equal(t, db, dbClient) + assert.Equal(t, true, shouldEncrypt) + assert.Equal(t, 10, maxEventsCount) + return i, nil + } + f := &CacheFactory{ + defaultMaximumEventsCount: 10, + dbClient: dbClient, + stopCh: make(chan struct{}), + newInformer: testNewInformer, + encryptAll: true, + informers: map[schema.GroupVersionKind]*guardedInformer{}, + } + + go func() { + time.Sleep(10 * time.Second) + close(f.stopCh) + }() + var c Cache + var err error + c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + assert.Nil(t, err) + assert.Equal(t, expectedC, c) + time.Sleep(1 * time.Second) + }}) + tests = append(tests, testCase{description: "CacheFor() with per GVK maximum events count", test: func(t *testing.T) { + dbClient := NewMockClient(gomock.NewController(t)) + dynamicClient := NewMockResourceInterface(gomock.NewController(t)) + fields := [][]string{{"something"}} + expectedGVK := schema.GroupVersionKind{ + Group: "management.cattle.io", + Version: "v3", + Kind: "Token", + } + sii := NewMockSharedIndexInformer(gomock.NewController(t)) + sii.EXPECT().HasSynced().Return(true) + sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() + sii.EXPECT().SetWatchErrorHandler(gomock.Any()) + i := &informer.Informer{ + // need to set this so Run function is not nil + SharedIndexInformer: sii, + } + expectedC := Cache{ + ByOptionsLister: i, + } + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt, namespaced bool, watchable bool, maxEventsCount int) (*informer.Informer, error) { + assert.Equal(t, client, dynamicClient) + assert.Equal(t, fields, fields) + assert.Equal(t, expectedGVK, gvk) + assert.Equal(t, db, dbClient) + assert.Equal(t, true, shouldEncrypt) + assert.Equal(t, 10, maxEventsCount) + return i, nil + } + f := &CacheFactory{ + defaultMaximumEventsCount: 5, + perGVKMaximumEventsCount: map[schema.GroupVersionKind]int{ + expectedGVK: 10, + }, + dbClient: dbClient, + stopCh: make(chan struct{}), + newInformer: testNewInformer, + encryptAll: true, + informers: map[schema.GroupVersionKind]*guardedInformer{}, + } + + go func() { + time.Sleep(10 * time.Second) + close(f.stopCh) + }() + var c Cache + var err error + c, err = f.CacheFor(context.Background(), fields, nil, dynamicClient, expectedGVK, false, true) + assert.Nil(t, err) + assert.Equal(t, expectedC, c) + time.Sleep(1 * time.Second) + }}) t.Parallel() for _, test := range tests { t.Run(test.description, func(t *testing.T) { test.test(t) }) diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index 1cb262fc..026b49ee 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -52,7 +52,7 @@ var newInformer = cache.NewSharedIndexInformer // NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form // using the specified client -func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool) (*Informer, error) { +func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, maxEventsCount int) (*Informer, error) { watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { return client.Watch(ctx, options) } @@ -101,7 +101,13 @@ func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [ if err != nil { return nil, err } - loi, err := NewListOptionIndexer(ctx, fields, s, namespaced) + + opts := ListOptionIndexerOptions{ + Fields: fields, + IsNamespaced: namespaced, + MaximumEventsCount: maxEventsCount, + } + loi, err := NewListOptionIndexer(ctx, s, opts) if err != nil { return nil, err } diff --git a/pkg/sqlcache/informer/informer_test.go b/pkg/sqlcache/informer/informer_test.go index c152dbc6..abc8bf6d 100644 --- a/pkg/sqlcache/informer/informer_test.go +++ b/pkg/sqlcache/informer/informer_test.go @@ -81,7 +81,7 @@ func TestNewInformer(t *testing.T) { } }) - informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true) + informer, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0) assert.Nil(t, err) assert.NotNil(t, informer.ByOptionsLister) assert.NotNil(t, informer.SharedIndexInformer) @@ -105,7 +105,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with errors returned from NewIndexer(), should return an error", test: func(t *testing.T) { @@ -140,7 +140,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with errors returned from NewListOptionIndexer(), should return an error", test: func(t *testing.T) { @@ -193,7 +193,7 @@ func TestNewInformer(t *testing.T) { } }) - _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true) + _, err := NewInformer(context.Background(), dynamicClient, fields, nil, gvk, dbClient, false, true, true, 0) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewInformer() with transform func", test: func(t *testing.T) { @@ -257,7 +257,7 @@ func TestNewInformer(t *testing.T) { transformFunc := func(input interface{}) (interface{}, error) { return "someoutput", nil } - informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true) + informer, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true, 0) assert.Nil(t, err) assert.NotNil(t, informer.ByOptionsLister) assert.NotNil(t, informer.SharedIndexInformer) @@ -293,7 +293,7 @@ func TestNewInformer(t *testing.T) { transformFunc := func(input interface{}) (interface{}, error) { return "someoutput", nil } - _, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true) + _, err := NewInformer(context.Background(), dynamicClient, fields, transformFunc, gvk, dbClient, false, true, true, 0) assert.Error(t, err) newInformer = cache.NewSharedIndexInformer }}) diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index e8bfd0e0..bdef66cc 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -35,31 +35,36 @@ type ListOptionIndexer struct { namespaced bool indexedFields []string + // maximumEventsCount is how many events to keep. 0 means keep all events. + maximumEventsCount int + latestRVLock sync.RWMutex latestRV string watchersLock sync.RWMutex watchers map[*watchKey]*watcher - upsertEventsQuery string - findEventsRowByRVQuery string - listEventsAfterQuery string - addFieldsQuery string - deleteFieldsByKeyQuery string - deleteFieldsQuery string - upsertLabelsQuery string - deleteLabelsByKeyQuery string - deleteLabelsQuery string + upsertEventsQuery string + findEventsRowByRVQuery string + listEventsAfterQuery string + deleteEventsByCountQuery string + addFieldsQuery string + deleteFieldsByKeyQuery string + deleteFieldsQuery string + upsertLabelsQuery string + deleteLabelsByKeyQuery string + deleteLabelsQuery string - upsertEventsStmt *sql.Stmt - findEventsRowByRVStmt *sql.Stmt - listEventsAfterStmt *sql.Stmt - addFieldsStmt *sql.Stmt - deleteFieldsByKeyStmt *sql.Stmt - deleteFieldsStmt *sql.Stmt - upsertLabelsStmt *sql.Stmt - deleteLabelsByKeyStmt *sql.Stmt - deleteLabelsStmt *sql.Stmt + upsertEventsStmt *sql.Stmt + findEventsRowByRVStmt *sql.Stmt + listEventsAfterStmt *sql.Stmt + deleteEventsByCountStmt *sql.Stmt + addFieldsStmt *sql.Stmt + deleteFieldsByKeyStmt *sql.Stmt + deleteFieldsStmt *sql.Stmt + upsertLabelsStmt *sql.Stmt + deleteLabelsByKeyStmt *sql.Stmt + deleteLabelsStmt *sql.Stmt } var ( @@ -91,6 +96,11 @@ const ( FROM "%s_events" WHERE rv = ? ` + deleteEventsByCountFmt = `DELETE FROM "%s_events" + WHERE rowid + NOT IN ( + SELECT rowid FROM "%s_events" ORDER BY rowid DESC LIMIT ? + )` createFieldsTableFmt = `CREATE TABLE "%s_fields" ( key TEXT NOT NULL PRIMARY KEY, @@ -114,10 +124,25 @@ const ( deleteLabelsStmtFmt = `DELETE FROM "%s_labels"` ) +type ListOptionIndexerOptions struct { + // Fields is a list of fields within the object that we want indexed for + // filtering & sorting. Each field is specified as a slice. + // + // For example, .metadata.resourceVersion should be specified as []string{"metadata", "resourceVersion"} + Fields [][]string + // IsNamespaced determines whether the GVK for this ListOptionIndexer is + // namespaced + IsNamespaced bool + // MaximumEventsCount is the maximum number of events we want to keep + // in the _events table. + // + // Zero means never delete events. + MaximumEventsCount int +} + // NewListOptionIndexer returns a SQLite-backed cache.Indexer of unstructured.Unstructured Kubernetes resources of a certain GVK // ListOptionIndexer is also able to satisfy ListOption queries on indexed (sub)fields. -// Fields are specified as slices (e.g. "metadata.resourceVersion" is ["metadata", "resourceVersion"]) -func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, namespaced bool) (*ListOptionIndexer, error) { +func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOptions) (*ListOptionIndexer, error) { // necessary in order to gob/ungob unstructured.Unstructured objects gob.Register(map[string]interface{}{}) gob.Register([]interface{}{}) @@ -131,28 +156,32 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names for _, f := range defaultIndexedFields { indexedFields = append(indexedFields, f) } - if namespaced { + if opts.IsNamespaced { indexedFields = append(indexedFields, defaultIndexNamespaced) } - for _, f := range fields { + for _, f := range opts.Fields { indexedFields = append(indexedFields, toColumnName(f)) } l := &ListOptionIndexer{ - Indexer: i, - namespaced: namespaced, - indexedFields: indexedFields, - watchers: make(map[*watchKey]*watcher), + Indexer: i, + namespaced: opts.IsNamespaced, + indexedFields: indexedFields, + maximumEventsCount: opts.MaximumEventsCount, + watchers: make(map[*watchKey]*watcher), } l.RegisterAfterAdd(l.addIndexFields) l.RegisterAfterAdd(l.addLabels) l.RegisterAfterAdd(l.notifyEventAdded) + l.RegisterAfterAdd(l.deleteOldEvents) l.RegisterAfterUpdate(l.addIndexFields) l.RegisterAfterUpdate(l.addLabels) l.RegisterAfterUpdate(l.notifyEventModified) + l.RegisterAfterUpdate(l.deleteOldEvents) l.RegisterAfterDelete(l.deleteFieldsByKey) l.RegisterAfterDelete(l.deleteLabelsByKey) l.RegisterAfterDelete(l.notifyEventDeleted) + l.RegisterAfterDelete(l.deleteOldEvents) l.RegisterAfterDeleteAll(l.deleteFields) l.RegisterAfterDeleteAll(l.deleteLabels) columnDefs := make([]string, len(indexedFields)) @@ -226,6 +255,9 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names l.findEventsRowByRVQuery = fmt.Sprintf(findEventsRowByRVFmt, dbName) l.findEventsRowByRVStmt = l.Prepare(l.findEventsRowByRVQuery) + l.deleteEventsByCountQuery = fmt.Sprintf(deleteEventsByCountFmt, dbName, dbName) + l.deleteEventsByCountStmt = l.Prepare(l.deleteEventsByCountQuery) + l.addFieldsQuery = fmt.Sprintf( `INSERT INTO "%s_fields"(key, %s) VALUES (?, %s) ON CONFLICT DO UPDATE SET %s`, dbName, @@ -450,6 +482,18 @@ func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, o return nil } +func (l *ListOptionIndexer) deleteOldEvents(key string, obj any, tx transaction.Client) error { + if l.maximumEventsCount == 0 { + return nil + } + + _, err := tx.Stmt(l.deleteEventsByCountStmt).Exec(l.maximumEventsCount) + if err != nil { + return &db.QueryError{QueryString: l.deleteEventsByCountQuery, Err: err} + } + return nil +} + // addIndexFields saves sortable/filterable fields into tables func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx transaction.Client) error { args := []any{key} diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index 97d05ea4..d70a4b09 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -30,7 +30,7 @@ import ( "k8s.io/client-go/tools/cache" ) -func makeListOptionIndexer(ctx context.Context, fields [][]string) (*ListOptionIndexer, error) { +func makeListOptionIndexer(ctx context.Context, opts ListOptionIndexerOptions) (*ListOptionIndexer, error) { gvk := schema.GroupVersionKind{ Group: "", Version: "v1", @@ -54,7 +54,7 @@ func makeListOptionIndexer(ctx context.Context, fields [][]string) (*ListOptionI return nil, err } - listOptionIndexer, err := NewListOptionIndexer(ctx, fields, s, true) + listOptionIndexer, err := NewListOptionIndexer(ctx, s, opts) if err != nil { return nil, err } @@ -91,9 +91,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) // create events table @@ -115,7 +115,11 @@ func TestNewListOptionIndexer(t *testing.T) { } }) - loi, err := NewListOptionIndexer(context.Background(), fields, store, true) + opts := ListOptionIndexerOptions{ + Fields: fields, + IsNamespaced: true, + } + loi, err := NewListOptionIndexer(context.Background(), store, opts) assert.Nil(t, err) assert.NotNil(t, loi) }}) @@ -136,7 +140,10 @@ func TestNewListOptionIndexer(t *testing.T) { } }) - _, err := NewListOptionIndexer(context.Background(), fields, store, false) + opts := ListOptionIndexerOptions{ + Fields: fields, + } + _, err := NewListOptionIndexer(context.Background(), store, opts) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewListOptionIndexer() with error returned from Begin(), should return an error", test: func(t *testing.T) { @@ -161,14 +168,17 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) store.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(fmt.Errorf("error")) - _, err := NewListOptionIndexer(context.Background(), fields, store, false) + opts := ListOptionIndexerOptions{ + Fields: fields, + } + _, err := NewListOptionIndexer(context.Background(), store, opts) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewListOptionIndexer() with error from Exec() when creating fields table, should return an error", test: func(t *testing.T) { @@ -193,9 +203,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) @@ -209,7 +219,11 @@ func TestNewListOptionIndexer(t *testing.T) { } }) - _, err := NewListOptionIndexer(context.Background(), fields, store, true) + opts := ListOptionIndexerOptions{ + Fields: fields, + IsNamespaced: true, + } + _, err := NewListOptionIndexer(context.Background(), store, opts) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewListOptionIndexer() with error from create-labels, should return an error", test: func(t *testing.T) { @@ -234,9 +248,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) @@ -254,7 +268,11 @@ func TestNewListOptionIndexer(t *testing.T) { } }) - _, err := NewListOptionIndexer(context.Background(), fields, store, true) + opts := ListOptionIndexerOptions{ + Fields: fields, + IsNamespaced: true, + } + _, err := NewListOptionIndexer(context.Background(), store, opts) assert.NotNil(t, err) }}) tests = append(tests, testCase{description: "NewListOptionIndexer() with error from Commit(), should return an error", test: func(t *testing.T) { @@ -279,9 +297,9 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().Prepare(gomock.Any()).Return(stmt).AnyTimes() // end NewIndexer() logic - store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) - store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) + store.EXPECT().RegisterAfterAdd(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(4) + store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(4) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) @@ -300,7 +318,11 @@ func TestNewListOptionIndexer(t *testing.T) { } }) - _, err := NewListOptionIndexer(context.Background(), fields, store, true) + opts := ListOptionIndexerOptions{ + Fields: fields, + IsNamespaced: true, + } + _, err := NewListOptionIndexer(context.Background(), store, opts) assert.NotNil(t, err) }}) @@ -894,7 +916,11 @@ func TestNewListOptionIndexerEasy(t *testing.T) { } fields = append(fields, test.extraIndexedFields...) - loi, err := makeListOptionIndexer(ctx, fields) + opts := ListOptionIndexerOptions{ + Fields: fields, + IsNamespaced: true, + } + loi, err := makeListOptionIndexer(ctx, opts) assert.NoError(t, err) for _, item := range itemList.Items { @@ -1828,7 +1854,11 @@ func TestGetField(t *testing.T) { func TestWatchMany(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - loi, err := makeListOptionIndexer(ctx, [][]string{{"metadata", "somefield"}}) + opts := ListOptionIndexerOptions{ + Fields: [][]string{{"metadata", "somefield"}}, + IsNamespaced: true, + } + loi, err := makeListOptionIndexer(ctx, opts) assert.NoError(t, err) startWatcher := func(ctx context.Context) (chan watch.Event, chan error) { @@ -2076,7 +2106,11 @@ func TestWatchFilter(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - loi, err := makeListOptionIndexer(ctx, [][]string{{"metadata", "somefield"}}) + opts := ListOptionIndexerOptions{ + Fields: [][]string{{"metadata", "somefield"}}, + IsNamespaced: true, + } + loi, err := makeListOptionIndexer(ctx, opts) assert.NoError(t, err) wCh, errCh := startWatcher(ctx, loi, WatchFilter{ @@ -2166,7 +2200,10 @@ func TestWatchResourceVersion(t *testing.T) { parentCtx := context.Background() - loi, err := makeListOptionIndexer(parentCtx, [][]string{}) + opts := ListOptionIndexerOptions{ + IsNamespaced: true, + } + loi, err := makeListOptionIndexer(parentCtx, opts) assert.NoError(t, err) getRV := func(t *testing.T) string { @@ -2261,3 +2298,154 @@ func TestWatchResourceVersion(t *testing.T) { }) } } + +func TestWatchGarbageCollection(t *testing.T) { + startWatcher := func(ctx context.Context, loi *ListOptionIndexer, rv string) (chan watch.Event, chan error) { + errCh := make(chan error, 1) + eventsCh := make(chan watch.Event, 100) + go func() { + watchErr := loi.Watch(ctx, WatchOptions{ResourceVersion: rv}, eventsCh) + errCh <- watchErr + }() + time.Sleep(100 * time.Millisecond) + return eventsCh, errCh + } + + waitStopWatcher := func(errCh chan error) error { + select { + case <-time.After(time.Second * 5): + return fmt.Errorf("not finished in time") + case err := <-errCh: + return err + } + } + + receiveEvents := func(eventsCh chan watch.Event) []watch.Event { + timer := time.NewTimer(time.Millisecond * 50) + var events []watch.Event + for { + select { + case <-timer.C: + return events + case ev := <-eventsCh: + events = append(events, ev) + } + } + } + + foo := &unstructured.Unstructured{} + foo.SetResourceVersion("100") + foo.SetName("foo") + + fooUpdated := foo.DeepCopy() + fooUpdated.SetResourceVersion("120") + + bar := &unstructured.Unstructured{} + bar.SetResourceVersion("150") + bar.SetName("bar") + + barNew := &unstructured.Unstructured{} + barNew.SetResourceVersion("160") + barNew.SetName("bar") + + parentCtx := context.Background() + + opts := ListOptionIndexerOptions{ + MaximumEventsCount: 2, + } + loi, err := makeListOptionIndexer(parentCtx, opts) + assert.NoError(t, err) + + getRV := func(t *testing.T) string { + t.Helper() + list, _, _, err := loi.ListByOptions(parentCtx, &sqltypes.ListOptions{}, []partition.Partition{{All: true}}, "") + assert.NoError(t, err) + return list.GetResourceVersion() + } + + err = loi.Add(foo) + assert.NoError(t, err) + rv1 := getRV(t) + + err = loi.Update(fooUpdated) + assert.NoError(t, err) + rv2 := getRV(t) + + err = loi.Add(bar) + assert.NoError(t, err) + rv3 := getRV(t) + + err = loi.Delete(bar) + assert.NoError(t, err) + rv4 := getRV(t) + + for _, rv := range []string{rv1, rv2} { + watcherCh, errCh := startWatcher(parentCtx, loi, rv) + gotEvents := receiveEvents(watcherCh) + err = waitStopWatcher(errCh) + assert.Empty(t, gotEvents) + assert.ErrorIs(t, err, ErrTooOld) + } + + tests := []struct { + rv string + expectedEvents []watch.Event + }{ + { + rv: rv3, + expectedEvents: []watch.Event{ + {Type: watch.Deleted, Object: bar}, + }, + }, + { + rv: rv4, + expectedEvents: nil, + }, + } + for _, test := range tests { + ctx, cancel := context.WithCancel(parentCtx) + watcherCh, errCh := startWatcher(ctx, loi, test.rv) + gotEvents := receiveEvents(watcherCh) + cancel() + err = waitStopWatcher(errCh) + assert.Equal(t, test.expectedEvents, gotEvents) + assert.NoError(t, err) + } + + err = loi.Add(barNew) + assert.NoError(t, err) + rv5 := getRV(t) + + for _, rv := range []string{rv1, rv2, rv3} { + watcherCh, errCh := startWatcher(parentCtx, loi, rv) + gotEvents := receiveEvents(watcherCh) + err = waitStopWatcher(errCh) + assert.Empty(t, gotEvents) + assert.ErrorIs(t, err, ErrTooOld) + } + + tests = []struct { + rv string + expectedEvents []watch.Event + }{ + { + rv: rv4, + expectedEvents: []watch.Event{ + {Type: watch.Added, Object: barNew}, + }, + }, + { + rv: rv5, + expectedEvents: nil, + }, + } + for _, test := range tests { + ctx, cancel := context.WithCancel(parentCtx) + watcherCh, errCh := startWatcher(ctx, loi, test.rv) + gotEvents := receiveEvents(watcherCh) + cancel() + err = waitStopWatcher(errCh) + assert.Equal(t, test.expectedEvents, gotEvents) + assert.NoError(t, err) + } +} diff --git a/pkg/sqlcache/integration_test.go b/pkg/sqlcache/integration_test.go index f88102e9..e9c36e8e 100644 --- a/pkg/sqlcache/integration_test.go +++ b/pkg/sqlcache/integration_test.go @@ -304,7 +304,7 @@ func (i *IntegrationSuite) TestSQLCacheFilters() { } func (i *IntegrationSuite) createCacheAndFactory(fields [][]string, transformFunc cache.TransformFunc) (*factory.Cache, *factory.CacheFactory, error) { - cacheFactory, err := factory.NewCacheFactory() + cacheFactory, err := factory.NewCacheFactory(factory.CacheFactoryOptions{}) if err != nil { return nil, nil, fmt.Errorf("unable to make factory: %w", err) } diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index 8e071a41..b63c0cee 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -303,7 +303,7 @@ func (s *Store) Reset() error { } func defaultInitializeCacheFactory() (CacheFactory, error) { - informerFactory, err := factory.NewCacheFactory() + informerFactory, err := factory.NewCacheFactory(factory.CacheFactoryOptions{}) if err != nil { return nil, err } diff --git a/pkg/stores/sqlproxy/proxy_store_test.go b/pkg/stores/sqlproxy/proxy_store_test.go index e4605e1f..f7c49a43 100644 --- a/pkg/stores/sqlproxy/proxy_store_test.go +++ b/pkg/stores/sqlproxy/proxy_store_test.go @@ -14,12 +14,9 @@ import ( "github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/resources/common" - "github.com/rancher/steve/pkg/sqlcache/db" - "github.com/rancher/steve/pkg/sqlcache/encryption" "github.com/rancher/steve/pkg/sqlcache/informer" "github.com/rancher/steve/pkg/sqlcache/informer/factory" "github.com/rancher/steve/pkg/sqlcache/partition" - "github.com/rancher/steve/pkg/sqlcache/store" "github.com/rancher/steve/pkg/stores/sqlpartition/listprocessor" "github.com/rancher/steve/pkg/stores/sqlproxy/tablelistconvert" "go.uber.org/mock/gomock" @@ -40,7 +37,6 @@ import ( "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/rest" clientgotesting "k8s.io/client-go/testing" - cache "k8s.io/client-go/tools/cache" ) //go:generate mockgen --build_flags=--mod=mod -package sqlproxy -destination ./proxy_mocks_test.go github.com/rancher/steve/pkg/stores/sqlproxy Cache,ClientGetter,CacheFactory,SchemaColumnSetter,RelationshipNotifier,TransformBuilder @@ -1522,35 +1518,3 @@ func TestUpdate(t *testing.T) { }) } } - -func makeListOptionIndexer(ctx context.Context, fields [][]string) (*informer.ListOptionIndexer, error) { - gvk := schema2.GroupVersionKind{ - Group: "", - Version: "", - Kind: "", - } - example := &unstructured.Unstructured{} - example.SetGroupVersionKind(gvk) - name := "theName" - m, err := encryption.NewManager() - if err != nil { - return nil, err - } - - db, err := db.NewClient(nil, m, m) - if err != nil { - return nil, err - } - - s, err := store.NewStore(ctx, example, cache.DeletionHandlingMetaNamespaceKeyFunc, db, false, name) - if err != nil { - return nil, err - } - - listOptionIndexer, err := informer.NewListOptionIndexer(ctx, fields, s, true) - if err != nil { - return nil, err - } - - return listOptionIndexer, nil -}