diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index 8a43ccc7..644a2216 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -148,7 +148,7 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, ex start := time.Now() log.Infof("CacheFor STARTS creating informer for %v", gvk) defer func() { - log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Now().Sub(start)) + log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Since(start)) }() _, encryptResourceAlways := defaultEncryptedResourceTypes[gvk] @@ -201,25 +201,36 @@ func (f *CacheFactory) Stop() error { return nil } - // first of all wait until all CacheFor() calls that create new informers are finished. Also block any new ones + // We must stop informers here to unblock those stuck in WaitForCacheSync + // which is blocking DoneWithCache call. + // + // This is fine without a lock as long as multiple Stop() call aren't made + // concurrently (which they currently aren't) + f.cancel() + + // Prevent more CacheFor calls f.mutex.Lock() defer f.mutex.Unlock() - // now that we are alone, stop all informers created until this point - f.cancel() + // Wait for all informers to have exited f.wg.Wait() + f.ctx, f.cancel = context.WithCancel(context.Background()) // and get rid of all references to those informers and their mutexes f.informersMutex.Lock() defer f.informersMutex.Unlock() - f.informers = make(map[schema.GroupVersionKind]*guardedInformer) - // finally, reset the DB connection - _, err := f.dbClient.NewConnection(false) - if err != nil { - return err + for gvk, informer := range f.informers { + // DropAll needs its own context because the context from the CacheFactory + // is canceled + err := informer.informer.DropAll(context.Background()) + if err != nil { + return fmt.Errorf("dropall %q: %w", gvk, err) + } } + f.informers = make(map[schema.GroupVersionKind]*guardedInformer) + return nil } diff --git a/pkg/sqlcache/informer/factory/informer_factory_test.go b/pkg/sqlcache/informer/factory/informer_factory_test.go index 8cf471a1..5fcd484e 100644 --- a/pkg/sqlcache/informer/factory/informer_factory_test.go +++ b/pkg/sqlcache/informer/factory/informer_factory_test.go @@ -67,6 +67,7 @@ func TestCacheFor(t *testing.T) { expectedGVK := schema.GroupVersionKind{} bloi := NewMockByOptionsLister(gomock.NewController(t)) bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true).AnyTimes() sii.EXPECT().Run(gomock.Any()).MinTimes(1) @@ -99,11 +100,9 @@ func TestCacheFor(t *testing.T) { go func() { // this function ensures that ctx is open for the duration of this test but if part of a longer process it will be closed eventually time.Sleep(5 * time.Second) - f.cancel() + f.Stop() }() - var c *Cache - var err error - c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) + c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) // this sleep is critical to the test. It ensure there has been enough time for expected function like Run to be invoked in their go routines. @@ -120,6 +119,7 @@ func TestCacheFor(t *testing.T) { bloi := NewMockByOptionsLister(gomock.NewController(t)) bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(false).AnyTimes() sii.EXPECT().Run(gomock.Any()) @@ -148,7 +148,7 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(1 * time.Second) - f.cancel() + f.Stop() }() var err error _, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) @@ -163,6 +163,7 @@ func TestCacheFor(t *testing.T) { bloi := NewMockByOptionsLister(gomock.NewController(t)) bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true).AnyTimes() // may or may not call run initially @@ -192,11 +193,9 @@ func TestCacheFor(t *testing.T) { informers: map[schema.GroupVersionKind]*guardedInformer{}, } f.ctx, f.cancel = context.WithCancel(context.Background()) - f.cancel() + f.Stop() - var c *Cache - var err error - c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) + c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -208,6 +207,7 @@ func TestCacheFor(t *testing.T) { expectedGVK := schema.GroupVersionKind{} bloi := NewMockByOptionsLister(gomock.NewController(t)) bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() @@ -240,11 +240,9 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(10 * time.Second) - f.cancel() + f.Stop() }() - var c *Cache - var err error - c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) + c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -261,6 +259,7 @@ func TestCacheFor(t *testing.T) { } bloi := NewMockByOptionsLister(gomock.NewController(t)) bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() @@ -293,11 +292,9 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(10 * time.Second) - f.cancel() + f.Stop() }() - var c *Cache - var err error - c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) + c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -313,6 +310,7 @@ func TestCacheFor(t *testing.T) { } bloi := NewMockByOptionsLister(gomock.NewController(t)) bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() @@ -345,11 +343,9 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(10 * time.Second) - f.cancel() + f.Stop() }() - var c *Cache - var err error - c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) + c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) @@ -362,6 +358,7 @@ func TestCacheFor(t *testing.T) { expectedGVK := schema.GroupVersionKind{} bloi := NewMockByOptionsLister(gomock.NewController(t)) bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1) @@ -405,7 +402,7 @@ func TestCacheFor(t *testing.T) { go func() { // this function ensures that ctx is not canceled for the duration of this test but if part of a longer process it will be closed eventually time.Sleep(5 * time.Second) - f.cancel() + f.Stop() }() var c *Cache var err error @@ -421,6 +418,7 @@ func TestCacheFor(t *testing.T) { expectedGVK := schema.GroupVersionKind{} bloi := NewMockByOptionsLister(gomock.NewController(t)) bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() sii := NewMockSharedIndexInformer(gomock.NewController(t)) sii.EXPECT().HasSynced().Return(true) sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes() @@ -456,12 +454,10 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(10 * time.Second) - f.cancel() + f.Stop() }() - var c *Cache - var err error // CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) - c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) + c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) assert.Equal(t, expectedC, c) time.Sleep(1 * time.Second) diff --git a/pkg/sqlcache/informer/factory/sql_informer_mocks_test.go b/pkg/sqlcache/informer/factory/sql_informer_mocks_test.go index 421dd62f..f2fff0c2 100644 --- a/pkg/sqlcache/informer/factory/sql_informer_mocks_test.go +++ b/pkg/sqlcache/informer/factory/sql_informer_mocks_test.go @@ -45,6 +45,20 @@ func (m *MockByOptionsLister) EXPECT() *MockByOptionsListerMockRecorder { return m.recorder } +// DropAll mocks base method. +func (m *MockByOptionsLister) DropAll(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DropAll", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DropAll indicates an expected call of DropAll. +func (mr *MockByOptionsListerMockRecorder) DropAll(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropAll", reflect.TypeOf((*MockByOptionsLister)(nil).DropAll), arg0) +} + // GetLatestResourceVersion mocks base method. func (m *MockByOptionsLister) GetLatestResourceVersion() []string { m.ctrl.T.Helper() diff --git a/pkg/sqlcache/informer/indexer.go b/pkg/sqlcache/informer/indexer.go index 88f8ce03..12c8ae96 100644 --- a/pkg/sqlcache/informer/indexer.go +++ b/pkg/sqlcache/informer/indexer.go @@ -31,6 +31,7 @@ const ( createIndexFmt = `CREATE INDEX IF NOT EXISTS "%[1]s_indices_index" ON "%[1]s_indices"(name, value)` deleteIndicesFmt = `DELETE FROM "%s_indices" WHERE key = ?` + dropIndicesFmt = `DROP TABLE IF EXISTS "%s_indices"` addIndexFmt = `INSERT INTO "%s_indices" (name, value, key) VALUES (?, ?, ?) ON CONFLICT DO NOTHING` listByIndexFmt = `SELECT object, objectnonce, dekid FROM "%[1]s" WHERE key IN ( @@ -50,12 +51,14 @@ type Indexer struct { indexersLock sync.RWMutex deleteIndicesQuery string + dropIndicesQuery string addIndexQuery string listByIndexQuery string listKeysByIndexQuery string listIndexValuesQuery string deleteIndicesStmt *sql.Stmt + dropIndicesStmt *sql.Stmt addIndexStmt *sql.Stmt listByIndexStmt *sql.Stmt listKeysByIndexStmt *sql.Stmt @@ -74,8 +77,10 @@ type Store interface { RegisterAfterUpdate(f func(key string, obj any, tx transaction.Client) error) RegisterAfterDelete(f func(key string, obj any, tx transaction.Client) error) RegisterAfterDeleteAll(f func(tx transaction.Client) error) + RegisterBeforeDropAll(f func(tx transaction.Client) error) GetShouldEncrypt() bool GetType() reflect.Type + DropAll(ctx context.Context) error } // NewIndexer returns a cache.Indexer backed by SQLite for objects of the given example type @@ -106,14 +111,17 @@ func NewIndexer(ctx context.Context, indexers cache.Indexers, s Store) (*Indexer } i.RegisterAfterAdd(i.AfterUpsert) i.RegisterAfterUpdate(i.AfterUpsert) + i.RegisterBeforeDropAll(i.dropIndices) i.deleteIndicesQuery = fmt.Sprintf(deleteIndicesFmt, db.Sanitize(s.GetName())) + i.dropIndicesQuery = fmt.Sprintf(dropIndicesFmt, db.Sanitize(s.GetName())) i.addIndexQuery = fmt.Sprintf(addIndexFmt, db.Sanitize(s.GetName())) i.listByIndexQuery = fmt.Sprintf(listByIndexFmt, db.Sanitize(s.GetName())) i.listKeysByIndexQuery = fmt.Sprintf(listKeyByIndexFmt, db.Sanitize(s.GetName())) i.listIndexValuesQuery = fmt.Sprintf(listIndexValuesFmt, db.Sanitize(s.GetName())) i.deleteIndicesStmt = s.Prepare(i.deleteIndicesQuery) + i.dropIndicesStmt = s.Prepare(i.dropIndicesQuery) i.addIndexStmt = s.Prepare(i.addIndexQuery) i.listByIndexStmt = s.Prepare(i.listByIndexQuery) i.listKeysByIndexStmt = s.Prepare(i.listKeysByIndexQuery) @@ -200,6 +208,14 @@ func (i *Indexer) Index(indexName string, obj any) (result []any, err error) { return i.ReadObjects(rows, i.GetType(), i.GetShouldEncrypt()) } +func (i *Indexer) dropIndices(tx transaction.Client) error { + _, err := tx.Stmt(i.dropIndicesStmt).Exec() + if err != nil { + return &db.QueryError{QueryString: i.dropIndicesQuery, Err: err} + } + return nil +} + // ByIndex returns the stored objects whose set of indexed values // for the named index includes the given indexed value func (i *Indexer) ByIndex(indexName, indexedValue string) ([]any, error) { diff --git a/pkg/sqlcache/informer/indexer_test.go b/pkg/sqlcache/informer/indexer_test.go index 1d4f675b..2c549921 100644 --- a/pkg/sqlcache/informer/indexer_test.go +++ b/pkg/sqlcache/informer/indexer_test.go @@ -60,7 +60,9 @@ func TestNewIndexer(t *testing.T) { }) store.EXPECT().RegisterAfterAdd(gomock.Any()) store.EXPECT().RegisterAfterUpdate(gomock.Any()) + store.EXPECT().RegisterBeforeDropAll(gomock.Any()) store.EXPECT().Prepare(fmt.Sprintf(deleteIndicesFmt, storeName)) + store.EXPECT().Prepare(fmt.Sprintf(dropIndicesFmt, storeName)) store.EXPECT().Prepare(fmt.Sprintf(addIndexFmt, storeName)) store.EXPECT().Prepare(fmt.Sprintf(listByIndexFmt, storeName, storeName)) store.EXPECT().Prepare(fmt.Sprintf(listKeyByIndexFmt, storeName)) diff --git a/pkg/sqlcache/informer/informer.go b/pkg/sqlcache/informer/informer.go index 7dba8dcc..66c776cd 100644 --- a/pkg/sqlcache/informer/informer.go +++ b/pkg/sqlcache/informer/informer.go @@ -51,6 +51,7 @@ type ByOptionsLister interface { Watch(ctx context.Context, options WatchOptions, eventsCh chan<- watch.Event) error GetLatestResourceVersion() []string RunGC(context.Context) + DropAll(context.Context) error } // this is set to a var so that it can be overridden by test code for mocking purposes diff --git a/pkg/sqlcache/informer/informer_mocks_test.go b/pkg/sqlcache/informer/informer_mocks_test.go index 6bc72099..09796c11 100644 --- a/pkg/sqlcache/informer/informer_mocks_test.go +++ b/pkg/sqlcache/informer/informer_mocks_test.go @@ -44,6 +44,20 @@ func (m *MockByOptionsLister) EXPECT() *MockByOptionsListerMockRecorder { return m.recorder } +// DropAll mocks base method. +func (m *MockByOptionsLister) DropAll(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DropAll", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DropAll indicates an expected call of DropAll. +func (mr *MockByOptionsListerMockRecorder) DropAll(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropAll", reflect.TypeOf((*MockByOptionsLister)(nil).DropAll), arg0) +} + // GetLatestResourceVersion mocks base method. func (m *MockByOptionsLister) GetLatestResourceVersion() []string { m.ctrl.T.Helper() diff --git a/pkg/sqlcache/informer/listoption_indexer.go b/pkg/sqlcache/informer/listoption_indexer.go index b667af0e..972c6e23 100644 --- a/pkg/sqlcache/informer/listoption_indexer.go +++ b/pkg/sqlcache/informer/listoption_indexer.go @@ -53,23 +53,29 @@ type ListOptionIndexer struct { findEventsRowByRVQuery string listEventsAfterQuery string deleteEventsByCountQuery string + dropEventsQuery string addFieldsQuery string deleteFieldsByKeyQuery string deleteFieldsQuery string + dropFieldsQuery string upsertLabelsQuery string deleteLabelsByKeyQuery string deleteLabelsQuery string + dropLabelsQuery string upsertEventsStmt *sql.Stmt findEventsRowByRVStmt *sql.Stmt listEventsAfterStmt *sql.Stmt deleteEventsByCountStmt *sql.Stmt + dropEventsStmt *sql.Stmt addFieldsStmt *sql.Stmt deleteFieldsByKeyStmt *sql.Stmt deleteFieldsStmt *sql.Stmt + dropFieldsStmt *sql.Stmt upsertLabelsStmt *sql.Stmt deleteLabelsByKeyStmt *sql.Stmt deleteLabelsStmt *sql.Stmt + dropLabelsStmt *sql.Stmt } var ( @@ -112,6 +118,7 @@ const ( SELECT rowid FROM "%s_events" ORDER BY rowid DESC LIMIT ? ) q )` + dropEventsFmt = `DROP TABLE IF EXISTS "%s_events"` createFieldsTableFmt = `CREATE TABLE "%s_fields" ( key TEXT NOT NULL PRIMARY KEY, @@ -119,6 +126,7 @@ const ( )` createFieldsIndexFmt = `CREATE INDEX "%s_%s_index" ON "%s_fields"("%s")` deleteFieldsFmt = `DELETE FROM "%s_fields"` + dropFieldsFmt = `DROP TABLE IF EXISTS "%s_fields"` failedToGetFromSliceFmt = "[listoption indexer] failed to get subfield [%s] from slice items" @@ -133,6 +141,7 @@ const ( upsertLabelsStmtFmt = `REPLACE INTO "%s_labels"(key, label, value) VALUES (?, ?, ?)` deleteLabelsByKeyStmtFmt = `DELETE FROM "%s_labels" WHERE KEY = ?` deleteLabelsStmtFmt = `DELETE FROM "%s_labels"` + dropLabelsStmtFmt = `DROP TABLE IF EXISTS "%s_labels"` ) type ListOptionIndexerOptions struct { @@ -190,6 +199,9 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp l.RegisterAfterDelete(l.notifyEventDeleted) l.RegisterAfterDeleteAll(l.deleteFields) l.RegisterAfterDeleteAll(l.deleteLabels) + l.RegisterBeforeDropAll(l.dropEvents) + l.RegisterBeforeDropAll(l.dropLabels) + l.RegisterBeforeDropAll(l.dropFields) columnDefs := make([]string, len(indexedFields)) for index, field := range indexedFields { column := fmt.Sprintf(`"%s" TEXT`, field) @@ -266,6 +278,9 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp l.deleteEventsByCountQuery = fmt.Sprintf(deleteEventsByCountFmt, dbName, dbName) l.deleteEventsByCountStmt = l.Prepare(l.deleteEventsByCountQuery) + l.dropEventsQuery = fmt.Sprintf(dropEventsFmt, dbName) + l.dropEventsStmt = l.Prepare(l.dropEventsQuery) + l.addFieldsQuery = fmt.Sprintf( `INSERT INTO "%s_fields"(key, %s) VALUES (?, %s) ON CONFLICT DO UPDATE SET %s`, dbName, @@ -275,17 +290,21 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp ) l.deleteFieldsByKeyQuery = fmt.Sprintf(`DELETE FROM "%s_fields" WHERE key = ?`, dbName) l.deleteFieldsQuery = fmt.Sprintf(deleteFieldsFmt, dbName) + l.dropFieldsQuery = fmt.Sprintf(dropFieldsFmt, dbName) l.addFieldsStmt = l.Prepare(l.addFieldsQuery) l.deleteFieldsByKeyStmt = l.Prepare(l.deleteFieldsByKeyQuery) l.deleteFieldsStmt = l.Prepare(l.deleteFieldsQuery) + l.dropFieldsStmt = l.Prepare(l.dropFieldsQuery) l.upsertLabelsQuery = fmt.Sprintf(upsertLabelsStmtFmt, dbName) l.deleteLabelsByKeyQuery = fmt.Sprintf(deleteLabelsByKeyStmtFmt, dbName) l.deleteLabelsQuery = fmt.Sprintf(deleteLabelsStmtFmt, dbName) + l.dropLabelsQuery = fmt.Sprintf(dropLabelsStmtFmt, dbName) l.upsertLabelsStmt = l.Prepare(l.upsertLabelsQuery) l.deleteLabelsByKeyStmt = l.Prepare(l.deleteLabelsByKeyQuery) l.deleteLabelsStmt = l.Prepare(l.deleteLabelsQuery) + l.dropLabelsStmt = l.Prepare(l.dropLabelsQuery) l.gcInterval = opts.GCInterval l.gcKeepCount = opts.GCKeepCount @@ -533,6 +552,14 @@ func (l *ListOptionIndexer) upsertEvent(tx transaction.Client, eventType watch.E return err } +func (l *ListOptionIndexer) dropEvents(tx transaction.Client) error { + _, err := tx.Stmt(l.dropEventsStmt).Exec() + if err != nil { + return &db.QueryError{QueryString: l.dropEventsQuery, Err: err} + } + return nil +} + // addIndexFields saves sortable/filterable fields into tables func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx transaction.Client) error { args := []any{key} @@ -596,6 +623,14 @@ func (l *ListOptionIndexer) deleteFields(tx transaction.Client) error { return nil } +func (l *ListOptionIndexer) dropFields(tx transaction.Client) error { + _, err := tx.Stmt(l.dropFieldsStmt).Exec() + if err != nil { + return &db.QueryError{QueryString: l.dropFieldsQuery, Err: err} + } + return nil +} + func (l *ListOptionIndexer) deleteLabelsByKey(key string, _ any, tx transaction.Client) error { _, err := tx.Stmt(l.deleteLabelsByKeyStmt).Exec(key) if err != nil { @@ -612,6 +647,14 @@ func (l *ListOptionIndexer) deleteLabels(tx transaction.Client) error { return nil } +func (l *ListOptionIndexer) dropLabels(tx transaction.Client) error { + _, err := tx.Stmt(l.dropLabelsStmt).Exec() + if err != nil { + return &db.QueryError{QueryString: l.dropLabelsQuery, Err: err} + } + return nil +} + // ListByOptions returns objects according to the specified list options and partitions. // Specifically: // - an unstructured list of resources belonging to any of the specified partitions diff --git a/pkg/sqlcache/informer/listoption_indexer_test.go b/pkg/sqlcache/informer/listoption_indexer_test.go index 9410a8cf..10d37393 100644 --- a/pkg/sqlcache/informer/listoption_indexer_test.go +++ b/pkg/sqlcache/informer/listoption_indexer_test.go @@ -65,10 +65,12 @@ func makeListOptionIndexer(ctx context.Context, opts ListOptionIndexerOptions, s if err != nil { return nil, "", err } - for _, item := range nsList.Items { - err = listOptionIndexer.Add(&item) - if err != nil { - return nil, "", err + if nsList != nil { + for _, item := range nsList.Items { + err = listOptionIndexer.Add(&item) + if err != nil { + return nil, "", err + } } } @@ -144,6 +146,7 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes() // create events table txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) @@ -221,6 +224,7 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes() store.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(fmt.Errorf("error")) @@ -256,6 +260,7 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes() txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) @@ -301,6 +306,7 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes() txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) @@ -350,6 +356,7 @@ func TestNewListOptionIndexer(t *testing.T) { store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3) store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2) + store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes() txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil) txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil) @@ -1223,6 +1230,35 @@ func TestNewListOptionIndexerEasy(t *testing.T) { } } +func TestDropAll(t *testing.T) { + ctx := t.Context() + + opts := ListOptionIndexerOptions{ + IsNamespaced: true, + } + loi, dbPath, err := makeListOptionIndexer(ctx, opts, false, nil) + defer cleanTempFiles(dbPath) + assert.NoError(t, err) + + obj1 := &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "name": "obj1", + }, + }, + } + err = loi.Add(obj1) + assert.NoError(t, err) + + _, _, _, err = loi.ListByOptions(ctx, &sqltypes.ListOptions{}, []partition.Partition{{All: true}}, "") + assert.NoError(t, err) + + loi.DropAll(ctx) + + _, _, _, err = loi.ListByOptions(ctx, &sqltypes.ListOptions{}, []partition.Partition{{All: true}}, "") + assert.Error(t, err) +} + func makePseudoRandomList(size int) *unstructured.UnstructuredList { numLength := 1 + int(math.Floor(math.Log10(float64(size)))) name_template := fmt.Sprintf("n%%0%dd", numLength) diff --git a/pkg/sqlcache/informer/sql_mocks_test.go b/pkg/sqlcache/informer/sql_mocks_test.go index 4f92758a..7695e631 100644 --- a/pkg/sqlcache/informer/sql_mocks_test.go +++ b/pkg/sqlcache/informer/sql_mocks_test.go @@ -99,6 +99,20 @@ func (mr *MockStoreMockRecorder) Delete(obj any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), obj) } +// DropAll mocks base method. +func (m *MockStore) DropAll(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DropAll", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// DropAll indicates an expected call of DropAll. +func (mr *MockStoreMockRecorder) DropAll(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropAll", reflect.TypeOf((*MockStore)(nil).DropAll), ctx) +} + // Encryptor mocks base method. func (m *MockStore) Encryptor() db.Encryptor { m.ctrl.T.Helper() @@ -372,6 +386,18 @@ func (mr *MockStoreMockRecorder) RegisterAfterUpdate(f any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterAfterUpdate", reflect.TypeOf((*MockStore)(nil).RegisterAfterUpdate), f) } +// RegisterBeforeDropAll mocks base method. +func (m *MockStore) RegisterBeforeDropAll(f func(transaction.Client) error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RegisterBeforeDropAll", f) +} + +// RegisterBeforeDropAll indicates an expected call of RegisterBeforeDropAll. +func (mr *MockStoreMockRecorder) RegisterBeforeDropAll(f any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterBeforeDropAll", reflect.TypeOf((*MockStore)(nil).RegisterBeforeDropAll), f) +} + // Replace mocks base method. func (m *MockStore) Replace(arg0 []any, arg1 string) error { m.ctrl.T.Helper() diff --git a/pkg/sqlcache/store/store.go b/pkg/sqlcache/store/store.go index ea7748ed..3c73a477 100644 --- a/pkg/sqlcache/store/store.go +++ b/pkg/sqlcache/store/store.go @@ -26,6 +26,7 @@ const ( upsertStmtFmt = `REPLACE INTO "%s"(key, object, objectnonce, dekid) VALUES (?, ?, ?, ?)` deleteStmtFmt = `DELETE FROM "%s" WHERE key = ?` deleteAllStmtFmt = `DELETE FROM "%s"` + dropBaseStmtFmt = `DROP TABLE IF EXISTS "%s"` getStmtFmt = `SELECT object, objectnonce, dekid FROM "%s" WHERE key = ?` listStmtFmt = `SELECT object, objectnonce, dekid FROM "%s"` listKeysStmtFmt = `SELECT key FROM "%s"` @@ -56,10 +57,12 @@ type Store struct { getQuery string listQuery string listKeysQuery string + dropBaseQuery string upsertStmt *sql.Stmt deleteStmt *sql.Stmt deleteAllStmt *sql.Stmt + dropBaseStmt *sql.Stmt getStmt *sql.Stmt listStmt *sql.Stmt listKeysStmt *sql.Stmt @@ -68,6 +71,7 @@ type Store struct { afterUpdate []func(key string, obj any, tx transaction.Client) error afterDelete []func(key string, obj any, tx transaction.Client) error afterDeleteAll []func(tx transaction.Client) error + beforeDropAll []func(tx transaction.Client) error } // Test that Store implements cache.Indexer @@ -110,6 +114,7 @@ func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Clie s.upsertQuery = fmt.Sprintf(upsertStmtFmt, dbName) s.deleteQuery = fmt.Sprintf(deleteStmtFmt, dbName) s.deleteAllQuery = fmt.Sprintf(deleteAllStmtFmt, dbName) + s.dropBaseQuery = fmt.Sprintf(dropBaseStmtFmt, dbName) s.getQuery = fmt.Sprintf(getStmtFmt, dbName) s.listQuery = fmt.Sprintf(listStmtFmt, dbName) s.listKeysQuery = fmt.Sprintf(listKeysStmtFmt, dbName) @@ -117,6 +122,7 @@ func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Clie s.upsertStmt = s.Prepare(s.upsertQuery) s.deleteStmt = s.Prepare(s.deleteQuery) s.deleteAllStmt = s.Prepare(s.deleteAllQuery) + s.dropBaseStmt = s.Prepare(s.dropBaseQuery) s.getStmt = s.Prepare(s.getQuery) s.listStmt = s.Prepare(s.listQuery) s.listKeysStmt = s.Prepare(s.listKeysQuery) @@ -524,6 +530,34 @@ func (s *Store) RegisterAfterDeleteAll(f func(txC transaction.Client) error) { s.afterDeleteAll = append(s.afterDeleteAll, f) } +func (s *Store) RegisterBeforeDropAll(f func(txC transaction.Client) error) { + s.beforeDropAll = append(s.beforeDropAll, f) +} + +// DropAll effectively removes the store from the database. The store must be +// recreated with NewStore. +// +// The store shouldn't be used once DropAll is called. +func (s *Store) DropAll(ctx context.Context) error { + err := s.WithTransaction(ctx, true, func(tx transaction.Client) error { + err := s.runBeforeDropAll(tx) + if err != nil { + return err + } + + _, err = tx.Stmt(s.dropBaseStmt).Exec(s.GetName()) + if err != nil { + return &db.QueryError{QueryString: s.dropBaseQuery, Err: err} + } + + return nil + }) + if err != nil { + return fmt.Errorf("dropall for %q: %w", s.GetName(), err) + } + return nil +} + // runAfterAdd executes functions registered to run after add event func (s *Store) runAfterAdd(key string, obj any, txC transaction.Client) error { for _, f := range s.afterAdd { @@ -568,3 +602,13 @@ func (s *Store) runAfterDeleteAll(txC transaction.Client) error { } return nil } + +func (s *Store) runBeforeDropAll(txC transaction.Client) error { + for _, f := range s.beforeDropAll { + err := f(txC) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/sqlcache/store/store_test.go b/pkg/sqlcache/store/store_test.go index 2042f6a1..daf6be12 100644 --- a/pkg/sqlcache/store/store_test.go +++ b/pkg/sqlcache/store/store_test.go @@ -1028,6 +1028,7 @@ func SetupMockDB(t *testing.T) (*MockClient, *MockTXClient) { dbC.EXPECT().Prepare(fmt.Sprintf(upsertStmtFmt, "testStoreObject")).Return(&sql.Stmt{}) dbC.EXPECT().Prepare(fmt.Sprintf(deleteStmtFmt, "testStoreObject")).Return(&sql.Stmt{}) dbC.EXPECT().Prepare(fmt.Sprintf(deleteAllStmtFmt, "testStoreObject")).Return(&sql.Stmt{}) + dbC.EXPECT().Prepare(fmt.Sprintf(dropBaseStmtFmt, "testStoreObject")).Return(&sql.Stmt{}) dbC.EXPECT().Prepare(fmt.Sprintf(getStmtFmt, "testStoreObject")).Return(&sql.Stmt{}) dbC.EXPECT().Prepare(fmt.Sprintf(listStmtFmt, "testStoreObject")).Return(&sql.Stmt{}) dbC.EXPECT().Prepare(fmt.Sprintf(listKeysStmtFmt, "testStoreObject")).Return(&sql.Stmt{}) diff --git a/pkg/stores/sqlproxy/sql_informer_mocks_test.go b/pkg/stores/sqlproxy/sql_informer_mocks_test.go index e79c1785..9cb4141a 100644 --- a/pkg/stores/sqlproxy/sql_informer_mocks_test.go +++ b/pkg/stores/sqlproxy/sql_informer_mocks_test.go @@ -45,6 +45,20 @@ func (m *MockByOptionsLister) EXPECT() *MockByOptionsListerMockRecorder { return m.recorder } +// DropAll mocks base method. +func (m *MockByOptionsLister) DropAll(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DropAll", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// DropAll indicates an expected call of DropAll. +func (mr *MockByOptionsListerMockRecorder) DropAll(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropAll", reflect.TypeOf((*MockByOptionsLister)(nil).DropAll), arg0) +} + // GetLatestResourceVersion mocks base method. func (m *MockByOptionsLister) GetLatestResourceVersion() []string { m.ctrl.T.Helper()