mirror of
https://github.com/rancher/steve.git
synced 2025-09-23 12:29:09 +00:00
Drop tables instead of removing the database completely (#807)
* Add drop tables * Fix deadlock * Use f.Stop in tests * Rename dropAllStmtFmt to dropBaseStmtFmt * Fix comment * More dropAll->dropBase renaming
This commit is contained in:
@@ -148,7 +148,7 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, ex
|
||||
start := time.Now()
|
||||
log.Infof("CacheFor STARTS creating informer for %v", gvk)
|
||||
defer func() {
|
||||
log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Now().Sub(start))
|
||||
log.Infof("CacheFor IS DONE creating informer for %v (took %v)", gvk, time.Since(start))
|
||||
}()
|
||||
|
||||
_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
|
||||
@@ -201,25 +201,36 @@ func (f *CacheFactory) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// first of all wait until all CacheFor() calls that create new informers are finished. Also block any new ones
|
||||
// We must stop informers here to unblock those stuck in WaitForCacheSync
|
||||
// which is blocking DoneWithCache call.
|
||||
//
|
||||
// This is fine without a lock as long as multiple Stop() call aren't made
|
||||
// concurrently (which they currently aren't)
|
||||
f.cancel()
|
||||
|
||||
// Prevent more CacheFor calls
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
// now that we are alone, stop all informers created until this point
|
||||
f.cancel()
|
||||
// Wait for all informers to have exited
|
||||
f.wg.Wait()
|
||||
|
||||
f.ctx, f.cancel = context.WithCancel(context.Background())
|
||||
|
||||
// and get rid of all references to those informers and their mutexes
|
||||
f.informersMutex.Lock()
|
||||
defer f.informersMutex.Unlock()
|
||||
f.informers = make(map[schema.GroupVersionKind]*guardedInformer)
|
||||
|
||||
// finally, reset the DB connection
|
||||
_, err := f.dbClient.NewConnection(false)
|
||||
if err != nil {
|
||||
return err
|
||||
for gvk, informer := range f.informers {
|
||||
// DropAll needs its own context because the context from the CacheFactory
|
||||
// is canceled
|
||||
err := informer.informer.DropAll(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("dropall %q: %w", gvk, err)
|
||||
}
|
||||
}
|
||||
|
||||
f.informers = make(map[schema.GroupVersionKind]*guardedInformer)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@@ -67,6 +67,7 @@ func TestCacheFor(t *testing.T) {
|
||||
expectedGVK := schema.GroupVersionKind{}
|
||||
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||
sii.EXPECT().HasSynced().Return(true).AnyTimes()
|
||||
sii.EXPECT().Run(gomock.Any()).MinTimes(1)
|
||||
@@ -99,11 +100,9 @@ func TestCacheFor(t *testing.T) {
|
||||
go func() {
|
||||
// this function ensures that ctx is open for the duration of this test but if part of a longer process it will be closed eventually
|
||||
time.Sleep(5 * time.Second)
|
||||
f.cancel()
|
||||
f.Stop()
|
||||
}()
|
||||
var c *Cache
|
||||
var err error
|
||||
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedC, c)
|
||||
// this sleep is critical to the test. It ensure there has been enough time for expected function like Run to be invoked in their go routines.
|
||||
@@ -120,6 +119,7 @@ func TestCacheFor(t *testing.T) {
|
||||
|
||||
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||
sii.EXPECT().HasSynced().Return(false).AnyTimes()
|
||||
sii.EXPECT().Run(gomock.Any())
|
||||
@@ -148,7 +148,7 @@ func TestCacheFor(t *testing.T) {
|
||||
|
||||
go func() {
|
||||
time.Sleep(1 * time.Second)
|
||||
f.cancel()
|
||||
f.Stop()
|
||||
}()
|
||||
var err error
|
||||
_, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
@@ -163,6 +163,7 @@ func TestCacheFor(t *testing.T) {
|
||||
|
||||
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||
sii.EXPECT().HasSynced().Return(true).AnyTimes()
|
||||
// may or may not call run initially
|
||||
@@ -192,11 +193,9 @@ func TestCacheFor(t *testing.T) {
|
||||
informers: map[schema.GroupVersionKind]*guardedInformer{},
|
||||
}
|
||||
f.ctx, f.cancel = context.WithCancel(context.Background())
|
||||
f.cancel()
|
||||
f.Stop()
|
||||
|
||||
var c *Cache
|
||||
var err error
|
||||
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedC, c)
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -208,6 +207,7 @@ func TestCacheFor(t *testing.T) {
|
||||
expectedGVK := schema.GroupVersionKind{}
|
||||
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||
sii.EXPECT().HasSynced().Return(true)
|
||||
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
|
||||
@@ -240,11 +240,9 @@ func TestCacheFor(t *testing.T) {
|
||||
|
||||
go func() {
|
||||
time.Sleep(10 * time.Second)
|
||||
f.cancel()
|
||||
f.Stop()
|
||||
}()
|
||||
var c *Cache
|
||||
var err error
|
||||
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedC, c)
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -261,6 +259,7 @@ func TestCacheFor(t *testing.T) {
|
||||
}
|
||||
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||
sii.EXPECT().HasSynced().Return(true)
|
||||
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
|
||||
@@ -293,11 +292,9 @@ func TestCacheFor(t *testing.T) {
|
||||
|
||||
go func() {
|
||||
time.Sleep(10 * time.Second)
|
||||
f.cancel()
|
||||
f.Stop()
|
||||
}()
|
||||
var c *Cache
|
||||
var err error
|
||||
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedC, c)
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -313,6 +310,7 @@ func TestCacheFor(t *testing.T) {
|
||||
}
|
||||
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||
sii.EXPECT().HasSynced().Return(true)
|
||||
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
|
||||
@@ -345,11 +343,9 @@ func TestCacheFor(t *testing.T) {
|
||||
|
||||
go func() {
|
||||
time.Sleep(10 * time.Second)
|
||||
f.cancel()
|
||||
f.Stop()
|
||||
}()
|
||||
var c *Cache
|
||||
var err error
|
||||
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedC, c)
|
||||
time.Sleep(1 * time.Second)
|
||||
@@ -362,6 +358,7 @@ func TestCacheFor(t *testing.T) {
|
||||
expectedGVK := schema.GroupVersionKind{}
|
||||
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||
sii.EXPECT().HasSynced().Return(true)
|
||||
sii.EXPECT().Run(gomock.Any()).MinTimes(1)
|
||||
@@ -405,7 +402,7 @@ func TestCacheFor(t *testing.T) {
|
||||
go func() {
|
||||
// this function ensures that ctx is not canceled for the duration of this test but if part of a longer process it will be closed eventually
|
||||
time.Sleep(5 * time.Second)
|
||||
f.cancel()
|
||||
f.Stop()
|
||||
}()
|
||||
var c *Cache
|
||||
var err error
|
||||
@@ -421,6 +418,7 @@ func TestCacheFor(t *testing.T) {
|
||||
expectedGVK := schema.GroupVersionKind{}
|
||||
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||
sii.EXPECT().HasSynced().Return(true)
|
||||
sii.EXPECT().Run(gomock.Any()).MinTimes(1).AnyTimes()
|
||||
@@ -456,12 +454,10 @@ func TestCacheFor(t *testing.T) {
|
||||
|
||||
go func() {
|
||||
time.Sleep(10 * time.Second)
|
||||
f.cancel()
|
||||
f.Stop()
|
||||
}()
|
||||
var c *Cache
|
||||
var err error
|
||||
// CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool)
|
||||
c, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedC, c)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
@@ -45,6 +45,20 @@ func (m *MockByOptionsLister) EXPECT() *MockByOptionsListerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// DropAll mocks base method.
|
||||
func (m *MockByOptionsLister) DropAll(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "DropAll", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// DropAll indicates an expected call of DropAll.
|
||||
func (mr *MockByOptionsListerMockRecorder) DropAll(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropAll", reflect.TypeOf((*MockByOptionsLister)(nil).DropAll), arg0)
|
||||
}
|
||||
|
||||
// GetLatestResourceVersion mocks base method.
|
||||
func (m *MockByOptionsLister) GetLatestResourceVersion() []string {
|
||||
m.ctrl.T.Helper()
|
||||
|
@@ -31,6 +31,7 @@ const (
|
||||
createIndexFmt = `CREATE INDEX IF NOT EXISTS "%[1]s_indices_index" ON "%[1]s_indices"(name, value)`
|
||||
|
||||
deleteIndicesFmt = `DELETE FROM "%s_indices" WHERE key = ?`
|
||||
dropIndicesFmt = `DROP TABLE IF EXISTS "%s_indices"`
|
||||
addIndexFmt = `INSERT INTO "%s_indices" (name, value, key) VALUES (?, ?, ?) ON CONFLICT DO NOTHING`
|
||||
listByIndexFmt = `SELECT object, objectnonce, dekid FROM "%[1]s"
|
||||
WHERE key IN (
|
||||
@@ -50,12 +51,14 @@ type Indexer struct {
|
||||
indexersLock sync.RWMutex
|
||||
|
||||
deleteIndicesQuery string
|
||||
dropIndicesQuery string
|
||||
addIndexQuery string
|
||||
listByIndexQuery string
|
||||
listKeysByIndexQuery string
|
||||
listIndexValuesQuery string
|
||||
|
||||
deleteIndicesStmt *sql.Stmt
|
||||
dropIndicesStmt *sql.Stmt
|
||||
addIndexStmt *sql.Stmt
|
||||
listByIndexStmt *sql.Stmt
|
||||
listKeysByIndexStmt *sql.Stmt
|
||||
@@ -74,8 +77,10 @@ type Store interface {
|
||||
RegisterAfterUpdate(f func(key string, obj any, tx transaction.Client) error)
|
||||
RegisterAfterDelete(f func(key string, obj any, tx transaction.Client) error)
|
||||
RegisterAfterDeleteAll(f func(tx transaction.Client) error)
|
||||
RegisterBeforeDropAll(f func(tx transaction.Client) error)
|
||||
GetShouldEncrypt() bool
|
||||
GetType() reflect.Type
|
||||
DropAll(ctx context.Context) error
|
||||
}
|
||||
|
||||
// NewIndexer returns a cache.Indexer backed by SQLite for objects of the given example type
|
||||
@@ -106,14 +111,17 @@ func NewIndexer(ctx context.Context, indexers cache.Indexers, s Store) (*Indexer
|
||||
}
|
||||
i.RegisterAfterAdd(i.AfterUpsert)
|
||||
i.RegisterAfterUpdate(i.AfterUpsert)
|
||||
i.RegisterBeforeDropAll(i.dropIndices)
|
||||
|
||||
i.deleteIndicesQuery = fmt.Sprintf(deleteIndicesFmt, db.Sanitize(s.GetName()))
|
||||
i.dropIndicesQuery = fmt.Sprintf(dropIndicesFmt, db.Sanitize(s.GetName()))
|
||||
i.addIndexQuery = fmt.Sprintf(addIndexFmt, db.Sanitize(s.GetName()))
|
||||
i.listByIndexQuery = fmt.Sprintf(listByIndexFmt, db.Sanitize(s.GetName()))
|
||||
i.listKeysByIndexQuery = fmt.Sprintf(listKeyByIndexFmt, db.Sanitize(s.GetName()))
|
||||
i.listIndexValuesQuery = fmt.Sprintf(listIndexValuesFmt, db.Sanitize(s.GetName()))
|
||||
|
||||
i.deleteIndicesStmt = s.Prepare(i.deleteIndicesQuery)
|
||||
i.dropIndicesStmt = s.Prepare(i.dropIndicesQuery)
|
||||
i.addIndexStmt = s.Prepare(i.addIndexQuery)
|
||||
i.listByIndexStmt = s.Prepare(i.listByIndexQuery)
|
||||
i.listKeysByIndexStmt = s.Prepare(i.listKeysByIndexQuery)
|
||||
@@ -200,6 +208,14 @@ func (i *Indexer) Index(indexName string, obj any) (result []any, err error) {
|
||||
return i.ReadObjects(rows, i.GetType(), i.GetShouldEncrypt())
|
||||
}
|
||||
|
||||
func (i *Indexer) dropIndices(tx transaction.Client) error {
|
||||
_, err := tx.Stmt(i.dropIndicesStmt).Exec()
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: i.dropIndicesQuery, Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ByIndex returns the stored objects whose set of indexed values
|
||||
// for the named index includes the given indexed value
|
||||
func (i *Indexer) ByIndex(indexName, indexedValue string) ([]any, error) {
|
||||
|
@@ -60,7 +60,9 @@ func TestNewIndexer(t *testing.T) {
|
||||
})
|
||||
store.EXPECT().RegisterAfterAdd(gomock.Any())
|
||||
store.EXPECT().RegisterAfterUpdate(gomock.Any())
|
||||
store.EXPECT().RegisterBeforeDropAll(gomock.Any())
|
||||
store.EXPECT().Prepare(fmt.Sprintf(deleteIndicesFmt, storeName))
|
||||
store.EXPECT().Prepare(fmt.Sprintf(dropIndicesFmt, storeName))
|
||||
store.EXPECT().Prepare(fmt.Sprintf(addIndexFmt, storeName))
|
||||
store.EXPECT().Prepare(fmt.Sprintf(listByIndexFmt, storeName, storeName))
|
||||
store.EXPECT().Prepare(fmt.Sprintf(listKeyByIndexFmt, storeName))
|
||||
|
@@ -51,6 +51,7 @@ type ByOptionsLister interface {
|
||||
Watch(ctx context.Context, options WatchOptions, eventsCh chan<- watch.Event) error
|
||||
GetLatestResourceVersion() []string
|
||||
RunGC(context.Context)
|
||||
DropAll(context.Context) error
|
||||
}
|
||||
|
||||
// this is set to a var so that it can be overridden by test code for mocking purposes
|
||||
|
@@ -44,6 +44,20 @@ func (m *MockByOptionsLister) EXPECT() *MockByOptionsListerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// DropAll mocks base method.
|
||||
func (m *MockByOptionsLister) DropAll(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "DropAll", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// DropAll indicates an expected call of DropAll.
|
||||
func (mr *MockByOptionsListerMockRecorder) DropAll(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropAll", reflect.TypeOf((*MockByOptionsLister)(nil).DropAll), arg0)
|
||||
}
|
||||
|
||||
// GetLatestResourceVersion mocks base method.
|
||||
func (m *MockByOptionsLister) GetLatestResourceVersion() []string {
|
||||
m.ctrl.T.Helper()
|
||||
|
@@ -53,23 +53,29 @@ type ListOptionIndexer struct {
|
||||
findEventsRowByRVQuery string
|
||||
listEventsAfterQuery string
|
||||
deleteEventsByCountQuery string
|
||||
dropEventsQuery string
|
||||
addFieldsQuery string
|
||||
deleteFieldsByKeyQuery string
|
||||
deleteFieldsQuery string
|
||||
dropFieldsQuery string
|
||||
upsertLabelsQuery string
|
||||
deleteLabelsByKeyQuery string
|
||||
deleteLabelsQuery string
|
||||
dropLabelsQuery string
|
||||
|
||||
upsertEventsStmt *sql.Stmt
|
||||
findEventsRowByRVStmt *sql.Stmt
|
||||
listEventsAfterStmt *sql.Stmt
|
||||
deleteEventsByCountStmt *sql.Stmt
|
||||
dropEventsStmt *sql.Stmt
|
||||
addFieldsStmt *sql.Stmt
|
||||
deleteFieldsByKeyStmt *sql.Stmt
|
||||
deleteFieldsStmt *sql.Stmt
|
||||
dropFieldsStmt *sql.Stmt
|
||||
upsertLabelsStmt *sql.Stmt
|
||||
deleteLabelsByKeyStmt *sql.Stmt
|
||||
deleteLabelsStmt *sql.Stmt
|
||||
dropLabelsStmt *sql.Stmt
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -112,6 +118,7 @@ const (
|
||||
SELECT rowid FROM "%s_events" ORDER BY rowid DESC LIMIT ?
|
||||
) q
|
||||
)`
|
||||
dropEventsFmt = `DROP TABLE IF EXISTS "%s_events"`
|
||||
|
||||
createFieldsTableFmt = `CREATE TABLE "%s_fields" (
|
||||
key TEXT NOT NULL PRIMARY KEY,
|
||||
@@ -119,6 +126,7 @@ const (
|
||||
)`
|
||||
createFieldsIndexFmt = `CREATE INDEX "%s_%s_index" ON "%s_fields"("%s")`
|
||||
deleteFieldsFmt = `DELETE FROM "%s_fields"`
|
||||
dropFieldsFmt = `DROP TABLE IF EXISTS "%s_fields"`
|
||||
|
||||
failedToGetFromSliceFmt = "[listoption indexer] failed to get subfield [%s] from slice items"
|
||||
|
||||
@@ -133,6 +141,7 @@ const (
|
||||
upsertLabelsStmtFmt = `REPLACE INTO "%s_labels"(key, label, value) VALUES (?, ?, ?)`
|
||||
deleteLabelsByKeyStmtFmt = `DELETE FROM "%s_labels" WHERE KEY = ?`
|
||||
deleteLabelsStmtFmt = `DELETE FROM "%s_labels"`
|
||||
dropLabelsStmtFmt = `DROP TABLE IF EXISTS "%s_labels"`
|
||||
)
|
||||
|
||||
type ListOptionIndexerOptions struct {
|
||||
@@ -190,6 +199,9 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp
|
||||
l.RegisterAfterDelete(l.notifyEventDeleted)
|
||||
l.RegisterAfterDeleteAll(l.deleteFields)
|
||||
l.RegisterAfterDeleteAll(l.deleteLabels)
|
||||
l.RegisterBeforeDropAll(l.dropEvents)
|
||||
l.RegisterBeforeDropAll(l.dropLabels)
|
||||
l.RegisterBeforeDropAll(l.dropFields)
|
||||
columnDefs := make([]string, len(indexedFields))
|
||||
for index, field := range indexedFields {
|
||||
column := fmt.Sprintf(`"%s" TEXT`, field)
|
||||
@@ -266,6 +278,9 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp
|
||||
l.deleteEventsByCountQuery = fmt.Sprintf(deleteEventsByCountFmt, dbName, dbName)
|
||||
l.deleteEventsByCountStmt = l.Prepare(l.deleteEventsByCountQuery)
|
||||
|
||||
l.dropEventsQuery = fmt.Sprintf(dropEventsFmt, dbName)
|
||||
l.dropEventsStmt = l.Prepare(l.dropEventsQuery)
|
||||
|
||||
l.addFieldsQuery = fmt.Sprintf(
|
||||
`INSERT INTO "%s_fields"(key, %s) VALUES (?, %s) ON CONFLICT DO UPDATE SET %s`,
|
||||
dbName,
|
||||
@@ -275,17 +290,21 @@ func NewListOptionIndexer(ctx context.Context, s Store, opts ListOptionIndexerOp
|
||||
)
|
||||
l.deleteFieldsByKeyQuery = fmt.Sprintf(`DELETE FROM "%s_fields" WHERE key = ?`, dbName)
|
||||
l.deleteFieldsQuery = fmt.Sprintf(deleteFieldsFmt, dbName)
|
||||
l.dropFieldsQuery = fmt.Sprintf(dropFieldsFmt, dbName)
|
||||
|
||||
l.addFieldsStmt = l.Prepare(l.addFieldsQuery)
|
||||
l.deleteFieldsByKeyStmt = l.Prepare(l.deleteFieldsByKeyQuery)
|
||||
l.deleteFieldsStmt = l.Prepare(l.deleteFieldsQuery)
|
||||
l.dropFieldsStmt = l.Prepare(l.dropFieldsQuery)
|
||||
|
||||
l.upsertLabelsQuery = fmt.Sprintf(upsertLabelsStmtFmt, dbName)
|
||||
l.deleteLabelsByKeyQuery = fmt.Sprintf(deleteLabelsByKeyStmtFmt, dbName)
|
||||
l.deleteLabelsQuery = fmt.Sprintf(deleteLabelsStmtFmt, dbName)
|
||||
l.dropLabelsQuery = fmt.Sprintf(dropLabelsStmtFmt, dbName)
|
||||
l.upsertLabelsStmt = l.Prepare(l.upsertLabelsQuery)
|
||||
l.deleteLabelsByKeyStmt = l.Prepare(l.deleteLabelsByKeyQuery)
|
||||
l.deleteLabelsStmt = l.Prepare(l.deleteLabelsQuery)
|
||||
l.dropLabelsStmt = l.Prepare(l.dropLabelsQuery)
|
||||
|
||||
l.gcInterval = opts.GCInterval
|
||||
l.gcKeepCount = opts.GCKeepCount
|
||||
@@ -533,6 +552,14 @@ func (l *ListOptionIndexer) upsertEvent(tx transaction.Client, eventType watch.E
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *ListOptionIndexer) dropEvents(tx transaction.Client) error {
|
||||
_, err := tx.Stmt(l.dropEventsStmt).Exec()
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: l.dropEventsQuery, Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// addIndexFields saves sortable/filterable fields into tables
|
||||
func (l *ListOptionIndexer) addIndexFields(key string, obj any, tx transaction.Client) error {
|
||||
args := []any{key}
|
||||
@@ -596,6 +623,14 @@ func (l *ListOptionIndexer) deleteFields(tx transaction.Client) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *ListOptionIndexer) dropFields(tx transaction.Client) error {
|
||||
_, err := tx.Stmt(l.dropFieldsStmt).Exec()
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: l.dropFieldsQuery, Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *ListOptionIndexer) deleteLabelsByKey(key string, _ any, tx transaction.Client) error {
|
||||
_, err := tx.Stmt(l.deleteLabelsByKeyStmt).Exec(key)
|
||||
if err != nil {
|
||||
@@ -612,6 +647,14 @@ func (l *ListOptionIndexer) deleteLabels(tx transaction.Client) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *ListOptionIndexer) dropLabels(tx transaction.Client) error {
|
||||
_, err := tx.Stmt(l.dropLabelsStmt).Exec()
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: l.dropLabelsQuery, Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListByOptions returns objects according to the specified list options and partitions.
|
||||
// Specifically:
|
||||
// - an unstructured list of resources belonging to any of the specified partitions
|
||||
|
@@ -65,10 +65,12 @@ func makeListOptionIndexer(ctx context.Context, opts ListOptionIndexerOptions, s
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
for _, item := range nsList.Items {
|
||||
err = listOptionIndexer.Add(&item)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
if nsList != nil {
|
||||
for _, item := range nsList.Items {
|
||||
err = listOptionIndexer.Add(&item)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,6 +146,7 @@ func TestNewListOptionIndexer(t *testing.T) {
|
||||
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
|
||||
store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes()
|
||||
|
||||
// create events table
|
||||
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
|
||||
@@ -221,6 +224,7 @@ func TestNewListOptionIndexer(t *testing.T) {
|
||||
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
|
||||
store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes()
|
||||
|
||||
store.EXPECT().WithTransaction(gomock.Any(), true, gomock.Any()).Return(fmt.Errorf("error"))
|
||||
|
||||
@@ -256,6 +260,7 @@ func TestNewListOptionIndexer(t *testing.T) {
|
||||
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
|
||||
store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes()
|
||||
|
||||
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
|
||||
txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil)
|
||||
@@ -301,6 +306,7 @@ func TestNewListOptionIndexer(t *testing.T) {
|
||||
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
|
||||
store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes()
|
||||
|
||||
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
|
||||
txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil)
|
||||
@@ -350,6 +356,7 @@ func TestNewListOptionIndexer(t *testing.T) {
|
||||
store.EXPECT().RegisterAfterUpdate(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDelete(gomock.Any()).Times(3)
|
||||
store.EXPECT().RegisterAfterDeleteAll(gomock.Any()).Times(2)
|
||||
store.EXPECT().RegisterBeforeDropAll(gomock.Any()).AnyTimes()
|
||||
|
||||
txClient.EXPECT().Exec(fmt.Sprintf(createEventsTableFmt, id)).Return(nil, nil)
|
||||
txClient.EXPECT().Exec(fmt.Sprintf(createFieldsTableFmt, id, `"metadata.name" TEXT, "metadata.creationTimestamp" TEXT, "metadata.namespace" TEXT, "something" TEXT`)).Return(nil, nil)
|
||||
@@ -1223,6 +1230,35 @@ func TestNewListOptionIndexerEasy(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDropAll(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
opts := ListOptionIndexerOptions{
|
||||
IsNamespaced: true,
|
||||
}
|
||||
loi, dbPath, err := makeListOptionIndexer(ctx, opts, false, nil)
|
||||
defer cleanTempFiles(dbPath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
obj1 := &unstructured.Unstructured{
|
||||
Object: map[string]any{
|
||||
"metadata": map[string]any{
|
||||
"name": "obj1",
|
||||
},
|
||||
},
|
||||
}
|
||||
err = loi.Add(obj1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, _, _, err = loi.ListByOptions(ctx, &sqltypes.ListOptions{}, []partition.Partition{{All: true}}, "")
|
||||
assert.NoError(t, err)
|
||||
|
||||
loi.DropAll(ctx)
|
||||
|
||||
_, _, _, err = loi.ListByOptions(ctx, &sqltypes.ListOptions{}, []partition.Partition{{All: true}}, "")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func makePseudoRandomList(size int) *unstructured.UnstructuredList {
|
||||
numLength := 1 + int(math.Floor(math.Log10(float64(size))))
|
||||
name_template := fmt.Sprintf("n%%0%dd", numLength)
|
||||
|
@@ -99,6 +99,20 @@ func (mr *MockStoreMockRecorder) Delete(obj any) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockStore)(nil).Delete), obj)
|
||||
}
|
||||
|
||||
// DropAll mocks base method.
|
||||
func (m *MockStore) DropAll(ctx context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "DropAll", ctx)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// DropAll indicates an expected call of DropAll.
|
||||
func (mr *MockStoreMockRecorder) DropAll(ctx any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropAll", reflect.TypeOf((*MockStore)(nil).DropAll), ctx)
|
||||
}
|
||||
|
||||
// Encryptor mocks base method.
|
||||
func (m *MockStore) Encryptor() db.Encryptor {
|
||||
m.ctrl.T.Helper()
|
||||
@@ -372,6 +386,18 @@ func (mr *MockStoreMockRecorder) RegisterAfterUpdate(f any) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterAfterUpdate", reflect.TypeOf((*MockStore)(nil).RegisterAfterUpdate), f)
|
||||
}
|
||||
|
||||
// RegisterBeforeDropAll mocks base method.
|
||||
func (m *MockStore) RegisterBeforeDropAll(f func(transaction.Client) error) {
|
||||
m.ctrl.T.Helper()
|
||||
m.ctrl.Call(m, "RegisterBeforeDropAll", f)
|
||||
}
|
||||
|
||||
// RegisterBeforeDropAll indicates an expected call of RegisterBeforeDropAll.
|
||||
func (mr *MockStoreMockRecorder) RegisterBeforeDropAll(f any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterBeforeDropAll", reflect.TypeOf((*MockStore)(nil).RegisterBeforeDropAll), f)
|
||||
}
|
||||
|
||||
// Replace mocks base method.
|
||||
func (m *MockStore) Replace(arg0 []any, arg1 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
@@ -26,6 +26,7 @@ const (
|
||||
upsertStmtFmt = `REPLACE INTO "%s"(key, object, objectnonce, dekid) VALUES (?, ?, ?, ?)`
|
||||
deleteStmtFmt = `DELETE FROM "%s" WHERE key = ?`
|
||||
deleteAllStmtFmt = `DELETE FROM "%s"`
|
||||
dropBaseStmtFmt = `DROP TABLE IF EXISTS "%s"`
|
||||
getStmtFmt = `SELECT object, objectnonce, dekid FROM "%s" WHERE key = ?`
|
||||
listStmtFmt = `SELECT object, objectnonce, dekid FROM "%s"`
|
||||
listKeysStmtFmt = `SELECT key FROM "%s"`
|
||||
@@ -56,10 +57,12 @@ type Store struct {
|
||||
getQuery string
|
||||
listQuery string
|
||||
listKeysQuery string
|
||||
dropBaseQuery string
|
||||
|
||||
upsertStmt *sql.Stmt
|
||||
deleteStmt *sql.Stmt
|
||||
deleteAllStmt *sql.Stmt
|
||||
dropBaseStmt *sql.Stmt
|
||||
getStmt *sql.Stmt
|
||||
listStmt *sql.Stmt
|
||||
listKeysStmt *sql.Stmt
|
||||
@@ -68,6 +71,7 @@ type Store struct {
|
||||
afterUpdate []func(key string, obj any, tx transaction.Client) error
|
||||
afterDelete []func(key string, obj any, tx transaction.Client) error
|
||||
afterDeleteAll []func(tx transaction.Client) error
|
||||
beforeDropAll []func(tx transaction.Client) error
|
||||
}
|
||||
|
||||
// Test that Store implements cache.Indexer
|
||||
@@ -110,6 +114,7 @@ func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Clie
|
||||
s.upsertQuery = fmt.Sprintf(upsertStmtFmt, dbName)
|
||||
s.deleteQuery = fmt.Sprintf(deleteStmtFmt, dbName)
|
||||
s.deleteAllQuery = fmt.Sprintf(deleteAllStmtFmt, dbName)
|
||||
s.dropBaseQuery = fmt.Sprintf(dropBaseStmtFmt, dbName)
|
||||
s.getQuery = fmt.Sprintf(getStmtFmt, dbName)
|
||||
s.listQuery = fmt.Sprintf(listStmtFmt, dbName)
|
||||
s.listKeysQuery = fmt.Sprintf(listKeysStmtFmt, dbName)
|
||||
@@ -117,6 +122,7 @@ func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Clie
|
||||
s.upsertStmt = s.Prepare(s.upsertQuery)
|
||||
s.deleteStmt = s.Prepare(s.deleteQuery)
|
||||
s.deleteAllStmt = s.Prepare(s.deleteAllQuery)
|
||||
s.dropBaseStmt = s.Prepare(s.dropBaseQuery)
|
||||
s.getStmt = s.Prepare(s.getQuery)
|
||||
s.listStmt = s.Prepare(s.listQuery)
|
||||
s.listKeysStmt = s.Prepare(s.listKeysQuery)
|
||||
@@ -524,6 +530,34 @@ func (s *Store) RegisterAfterDeleteAll(f func(txC transaction.Client) error) {
|
||||
s.afterDeleteAll = append(s.afterDeleteAll, f)
|
||||
}
|
||||
|
||||
func (s *Store) RegisterBeforeDropAll(f func(txC transaction.Client) error) {
|
||||
s.beforeDropAll = append(s.beforeDropAll, f)
|
||||
}
|
||||
|
||||
// DropAll effectively removes the store from the database. The store must be
|
||||
// recreated with NewStore.
|
||||
//
|
||||
// The store shouldn't be used once DropAll is called.
|
||||
func (s *Store) DropAll(ctx context.Context) error {
|
||||
err := s.WithTransaction(ctx, true, func(tx transaction.Client) error {
|
||||
err := s.runBeforeDropAll(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.Stmt(s.dropBaseStmt).Exec(s.GetName())
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: s.dropBaseQuery, Err: err}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("dropall for %q: %w", s.GetName(), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// runAfterAdd executes functions registered to run after add event
|
||||
func (s *Store) runAfterAdd(key string, obj any, txC transaction.Client) error {
|
||||
for _, f := range s.afterAdd {
|
||||
@@ -568,3 +602,13 @@ func (s *Store) runAfterDeleteAll(txC transaction.Client) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) runBeforeDropAll(txC transaction.Client) error {
|
||||
for _, f := range s.beforeDropAll {
|
||||
err := f(txC)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@@ -1028,6 +1028,7 @@ func SetupMockDB(t *testing.T) (*MockClient, *MockTXClient) {
|
||||
dbC.EXPECT().Prepare(fmt.Sprintf(upsertStmtFmt, "testStoreObject")).Return(&sql.Stmt{})
|
||||
dbC.EXPECT().Prepare(fmt.Sprintf(deleteStmtFmt, "testStoreObject")).Return(&sql.Stmt{})
|
||||
dbC.EXPECT().Prepare(fmt.Sprintf(deleteAllStmtFmt, "testStoreObject")).Return(&sql.Stmt{})
|
||||
dbC.EXPECT().Prepare(fmt.Sprintf(dropBaseStmtFmt, "testStoreObject")).Return(&sql.Stmt{})
|
||||
dbC.EXPECT().Prepare(fmt.Sprintf(getStmtFmt, "testStoreObject")).Return(&sql.Stmt{})
|
||||
dbC.EXPECT().Prepare(fmt.Sprintf(listStmtFmt, "testStoreObject")).Return(&sql.Stmt{})
|
||||
dbC.EXPECT().Prepare(fmt.Sprintf(listKeysStmtFmt, "testStoreObject")).Return(&sql.Stmt{})
|
||||
|
@@ -45,6 +45,20 @@ func (m *MockByOptionsLister) EXPECT() *MockByOptionsListerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// DropAll mocks base method.
|
||||
func (m *MockByOptionsLister) DropAll(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "DropAll", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// DropAll indicates an expected call of DropAll.
|
||||
func (mr *MockByOptionsListerMockRecorder) DropAll(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DropAll", reflect.TypeOf((*MockByOptionsLister)(nil).DropAll), arg0)
|
||||
}
|
||||
|
||||
// GetLatestResourceVersion mocks base method.
|
||||
func (m *MockByOptionsLister) GetLatestResourceVersion() []string {
|
||||
m.ctrl.T.Helper()
|
||||
|
Reference in New Issue
Block a user