1
0
mirror of https://github.com/rancher/steve.git synced 2025-07-13 14:44:48 +00:00
steve/pkg/sqlcache/store/store.go

500 lines
14 KiB
Go
Raw Normal View History

/*
Package store contains the sql backed store. It persists objects to a sqlite database.
*/
package store
import (
"context"
"database/sql"
"fmt"
"reflect"
"github.com/rancher/lasso/pkg/log"
"github.com/rancher/steve/pkg/sqlcache/db"
"github.com/rancher/steve/pkg/sqlcache/db/transaction"
"github.com/rancher/steve/pkg/sqlcache/sqltypes"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
// needed for drivers
_ "modernc.org/sqlite"
)
const (
upsertStmtFmt = `REPLACE INTO "%s"(key, object, objectnonce, dekid) VALUES (?, ?, ?, ?)`
deleteStmtFmt = `DELETE FROM "%s" WHERE key = ?`
deleteAllStmtFmt = `DELETE FROM "%s"`
getStmtFmt = `SELECT object, objectnonce, dekid FROM "%s" WHERE key = ?`
listStmtFmt = `SELECT object, objectnonce, dekid FROM "%s"`
listKeysStmtFmt = `SELECT key FROM "%s"`
createTableFmt = `CREATE TABLE IF NOT EXISTS "%s" (
key TEXT UNIQUE NOT NULL PRIMARY KEY,
object BLOB,
objectnonce BLOB,
dekid INTEGER
)`
)
// Store is a SQLite-backed cache.Store
type Store struct {
db.Client
ctx context.Context
gvk schema.GroupVersionKind
name string
externalUpdateInfo *sqltypes.ExternalGVKUpdates
typ reflect.Type
keyFunc cache.KeyFunc
shouldEncrypt bool
upsertQuery string
deleteQuery string
deleteAllQuery string
getQuery string
listQuery string
listKeysQuery string
upsertStmt *sql.Stmt
deleteStmt *sql.Stmt
deleteAllStmt *sql.Stmt
getStmt *sql.Stmt
listStmt *sql.Stmt
listKeysStmt *sql.Stmt
afterAdd []func(key string, obj any, tx transaction.Client) error
afterUpdate []func(key string, obj any, tx transaction.Client) error
afterDelete []func(key string, obj any, tx transaction.Client) error
afterDeleteAll []func(tx transaction.Client) error
}
// Test that Store implements cache.Indexer
var _ cache.Store = (*Store)(nil)
// NewStore creates a SQLite-backed cache.Store for objects of the given example type
func NewStore(ctx context.Context, example any, keyFunc cache.KeyFunc, c db.Client, shouldEncrypt bool, gvk schema.GroupVersionKind, name string, externalUpdateInfo *sqltypes.ExternalGVKUpdates) (*Store, error) {
s := &Store{
ctx: ctx,
name: name,
gvk: gvk,
externalUpdateInfo: externalUpdateInfo,
typ: reflect.TypeOf(example),
Client: c,
keyFunc: keyFunc,
shouldEncrypt: shouldEncrypt,
afterAdd: []func(key string, obj any, tx transaction.Client) error{},
afterUpdate: []func(key string, obj any, tx transaction.Client) error{},
afterDelete: []func(key string, obj any, tx transaction.Client) error{},
afterDeleteAll: []func(tx transaction.Client) error{},
}
dbName := db.Sanitize(s.name)
// once multiple informer-factories are needed, this can accept the case where table already exists error is received
err := s.WithTransaction(ctx, true, func(tx transaction.Client) error {
createTableQuery := fmt.Sprintf(createTableFmt, dbName)
_, err := tx.Exec(createTableQuery)
if err != nil {
return &db.QueryError{QueryString: createTableQuery, Err: err}
}
return nil
})
if err != nil {
return nil, err
}
s.upsertQuery = fmt.Sprintf(upsertStmtFmt, dbName)
s.deleteQuery = fmt.Sprintf(deleteStmtFmt, dbName)
s.deleteAllQuery = fmt.Sprintf(deleteAllStmtFmt, dbName)
s.getQuery = fmt.Sprintf(getStmtFmt, dbName)
s.listQuery = fmt.Sprintf(listStmtFmt, dbName)
s.listKeysQuery = fmt.Sprintf(listKeysStmtFmt, dbName)
s.upsertStmt = s.Prepare(s.upsertQuery)
s.deleteStmt = s.Prepare(s.deleteQuery)
s.deleteAllStmt = s.Prepare(s.deleteAllQuery)
s.getStmt = s.Prepare(s.getQuery)
s.listStmt = s.Prepare(s.listQuery)
s.listKeysStmt = s.Prepare(s.listKeysQuery)
return s, nil
}
func (s *Store) checkUpdateExternalInfo(key string) error {
if s.externalUpdateInfo == nil {
return nil
}
return s.WithTransaction(s.ctx, true, func(tx transaction.Client) error {
if err := s.updateExternalInfo(tx, key, s.externalUpdateInfo); err != nil {
// Just report and ignore errors
logrus.Errorf("Error updating external info %v: %s", s.externalUpdateInfo, err)
}
return nil
})
}
func (s *Store) updateExternalInfo(tx transaction.Client, key string, externalUpdateInfo *sqltypes.ExternalGVKUpdates) error {
for _, labelDep := range externalUpdateInfo.ExternalLabelDependencies {
rawGetStmt := fmt.Sprintf(`SELECT DISTINCT f.key, ex2."%s" FROM "%s_fields" f
LEFT OUTER JOIN "%s_labels" lt1 ON f.key = lt1.key
JOIN "%s_fields" ex2 ON lt1.value = ex2."%s"
WHERE lt1.label = ? AND f."%s" != ex2."%s"`,
labelDep.TargetFinalFieldName,
labelDep.SourceGVK,
labelDep.SourceGVK,
labelDep.TargetGVK,
labelDep.TargetKeyFieldName,
labelDep.TargetFinalFieldName,
labelDep.TargetFinalFieldName,
)
getStmt := s.Prepare(rawGetStmt)
rows, err := s.QueryForRows(s.ctx, getStmt, labelDep.SourceLabelName)
if err != nil {
logrus.Infof("Error getting external info for table %s, key %s: %v", labelDep.TargetGVK, key, &db.QueryError{QueryString: rawGetStmt, Err: err})
continue
}
result, err := s.ReadStrings2(rows)
if err != nil {
logrus.Infof("Error reading objects for table %s, key %s: %s", labelDep.TargetGVK, key, err)
continue
}
if len(result) == 0 {
continue
}
for _, innerResult := range result {
sourceKey := innerResult[0]
finalTargetValue := innerResult[1]
rawStmt := fmt.Sprintf(`UPDATE "%s_fields" SET "%s" = ? WHERE key = ?`,
labelDep.SourceGVK, labelDep.TargetFinalFieldName)
preparedStmt := s.Prepare(rawStmt)
_, err = tx.Stmt(preparedStmt).Exec(finalTargetValue, sourceKey)
if err != nil {
logrus.Infof("Error running %s(%s, %s): %s", rawStmt, finalTargetValue, sourceKey, err)
continue
}
}
}
for _, nonLabelDep := range externalUpdateInfo.ExternalDependencies {
rawGetStmt := fmt.Sprintf(`SELECT f.key, ex2."%s"
FROM "%s_fields" f JOIN "%s_fields" ex2 ON f."%s" = ex2."%s"
WHERE f."%s" != ex2."%s"`,
nonLabelDep.TargetFinalFieldName,
nonLabelDep.SourceGVK,
nonLabelDep.TargetGVK,
nonLabelDep.SourceFieldName,
nonLabelDep.TargetKeyFieldName,
nonLabelDep.TargetFinalFieldName,
nonLabelDep.TargetFinalFieldName)
// TODO: Try to fold the two blocks together
getStmt := s.Prepare(rawGetStmt)
rows, err := s.QueryForRows(s.ctx, getStmt, nonLabelDep.SourceFieldName)
if err != nil {
logrus.Infof("Error getting external info for table %s, key %s: %v", nonLabelDep.TargetGVK, key, &db.QueryError{QueryString: rawGetStmt, Err: err})
continue
}
result, err := s.ReadStrings2(rows)
if err != nil {
logrus.Infof("Error reading objects for table %s, key %s: %s", nonLabelDep.TargetGVK, key, err)
continue
}
if len(result) == 0 {
continue
}
for _, innerResult := range result {
sourceKey := innerResult[0]
finalTargetValue := innerResult[1]
rawStmt := fmt.Sprintf(`UPDATE "%s_fields" SET "%s" = ? WHERE key = ?`,
nonLabelDep.SourceGVK, nonLabelDep.TargetFinalFieldName)
preparedStmt := s.Prepare(rawStmt)
_, err = tx.Stmt(preparedStmt).Exec(finalTargetValue, sourceKey)
if err != nil {
logrus.Infof("Error running %s(%s, %s): %s", rawStmt, finalTargetValue, sourceKey, err)
continue
}
}
}
return nil
}
// deleteByKey deletes the object associated with key, if it exists in this Store
func (s *Store) deleteByKey(key string, obj any) error {
return s.WithTransaction(s.ctx, true, func(tx transaction.Client) error {
_, err := tx.Stmt(s.deleteStmt).Exec(key)
if err != nil {
return &db.QueryError{QueryString: s.deleteQuery, Err: err}
}
err = s.runAfterDelete(key, obj, tx)
if err != nil {
return err
}
return nil
})
}
// GetByKey returns the object associated with the given object's key
func (s *Store) GetByKey(key string) (item any, exists bool, err error) {
rows, err := s.QueryForRows(s.ctx, s.getStmt, key)
if err != nil {
return nil, false, &db.QueryError{QueryString: s.getQuery, Err: err}
}
result, err := s.ReadObjects(rows, s.typ, s.shouldEncrypt)
if err != nil {
return nil, false, err
}
if len(result) == 0 {
return nil, false, nil
}
return result[0], true, nil
}
/* Satisfy cache.Store */
/* Core methods */
// Add saves an obj, or updates it if it exists in this Store
func (s *Store) Add(obj any) error {
key, err := s.keyFunc(obj)
if err != nil {
return err
}
err = s.WithTransaction(s.ctx, true, func(tx transaction.Client) error {
err := s.Upsert(tx, s.upsertStmt, key, obj, s.shouldEncrypt)
if err != nil {
return &db.QueryError{QueryString: s.upsertQuery, Err: err}
}
err = s.runAfterAdd(key, obj, tx)
if err != nil {
return err
}
return nil
})
if err != nil {
log.Errorf("Error in Store.Add for type %v: %v", s.name, err)
return err
}
return s.checkUpdateExternalInfo(key)
}
// Update saves an obj, or updates it if it exists in this Store
func (s *Store) Update(obj any) error {
key, err := s.keyFunc(obj)
if err != nil {
return err
}
err = s.WithTransaction(s.ctx, true, func(tx transaction.Client) error {
err := s.Upsert(tx, s.upsertStmt, key, obj, s.shouldEncrypt)
if err != nil {
return &db.QueryError{QueryString: s.upsertQuery, Err: err}
}
err = s.runAfterUpdate(key, obj, tx)
if err != nil {
return err
}
return nil
})
if err != nil {
log.Errorf("Error in Store.Update for type %v: %v", s.name, err)
return err
}
return s.checkUpdateExternalInfo(key)
}
// Delete deletes the given object, if it exists in this Store
func (s *Store) Delete(obj any) error {
key, err := s.keyFunc(obj)
if err != nil {
return err
}
err = s.deleteByKey(key, obj)
if err != nil {
log.Errorf("Error in Store.Delete for type %v: %v", s.name, err)
return err
}
return nil
}
// List returns a list of all the currently known objects
// Note: I/O errors will panic this function, as the interface signature does not allow returning errors
func (s *Store) List() []any {
rows, err := s.QueryForRows(s.ctx, s.listStmt)
if err != nil {
panic(&db.QueryError{QueryString: s.listQuery, Err: err})
}
result, err := s.ReadObjects(rows, s.typ, s.shouldEncrypt)
if err != nil {
panic(fmt.Errorf("error in Store.List: %w", err))
}
return result
}
// ListKeys returns a list of all the keys currently in this Store
// Note: Atm it doesn't appear returning nil in the case of an error has any detrimental effects. An error is not
// uncommon enough nor does it appear to necessitate a panic.
func (s *Store) ListKeys() []string {
rows, err := s.QueryForRows(s.ctx, s.listKeysStmt)
if err != nil {
fmt.Printf("Unexpected error in store.ListKeys: while executing query: %s got error: %v", s.listKeysQuery, err)
return []string{}
}
result, err := s.ReadStrings(rows)
if err != nil {
fmt.Printf("Unexpected error in store.ListKeys: %v\n", err)
return []string{}
}
return result
}
// Get returns the object with the same key as obj
func (s *Store) Get(obj any) (item any, exists bool, err error) {
key, err := s.keyFunc(obj)
if err != nil {
return nil, false, err
}
return s.GetByKey(key)
}
// Replace will delete the contents of the Store, using instead the given list
func (s *Store) Replace(objects []any, _ string) error {
objectMap := map[string]any{}
for _, object := range objects {
key, err := s.keyFunc(object)
if err != nil {
return err
}
objectMap[key] = object
}
err := s.replaceByKey(objectMap)
if err != nil {
log.Errorf("Error in Store.Replace for type %v: %v", s.name, err)
return err
}
return nil
}
// replaceByKey will delete the contents of the Store, using instead the given key to obj map
func (s *Store) replaceByKey(objects map[string]any) error {
return s.WithTransaction(s.ctx, true, func(txC transaction.Client) error {
_, err := txC.Stmt(s.deleteAllStmt).Exec()
if err != nil {
return &db.QueryError{QueryString: s.deleteAllQuery, Err: err}
}
err = s.runAfterDeleteAll(txC)
if err != nil {
return err
}
for key, obj := range objects {
err = s.Upsert(txC, s.upsertStmt, key, obj, s.shouldEncrypt)
if err != nil {
return err
}
err = s.runAfterAdd(key, obj, txC)
if err != nil {
return err
}
}
return nil
})
}
// Resync is a no-op and is deprecated
func (s *Store) Resync() error {
return nil
}
/* Utilities */
func (s *Store) GetName() string {
return s.name
}
func (s *Store) GetShouldEncrypt() bool {
return s.shouldEncrypt
}
func (s *Store) GetType() reflect.Type {
return s.typ
}
// RegisterAfterAdd registers a func to be called after each add event
func (s *Store) RegisterAfterAdd(f func(key string, obj any, txC transaction.Client) error) {
s.afterAdd = append(s.afterAdd, f)
}
// RegisterAfterUpdate registers a func to be called after each update event
func (s *Store) RegisterAfterUpdate(f func(key string, obj any, txC transaction.Client) error) {
s.afterUpdate = append(s.afterUpdate, f)
}
// RegisterAfterDelete registers a func to be called after each deletion
func (s *Store) RegisterAfterDelete(f func(key string, obj any, txC transaction.Client) error) {
s.afterDelete = append(s.afterDelete, f)
}
// RegisterAfterDelete registers a func to be called after each deletion
func (s *Store) RegisterAfterDeleteAll(f func(txC transaction.Client) error) {
s.afterDeleteAll = append(s.afterDeleteAll, f)
}
// runAfterAdd executes functions registered to run after add event
func (s *Store) runAfterAdd(key string, obj any, txC transaction.Client) error {
for _, f := range s.afterAdd {
err := f(key, obj, txC)
if err != nil {
return err
}
}
return nil
}
// runAfterUpdate executes functions registered to run after update event
func (s *Store) runAfterUpdate(key string, obj any, txC transaction.Client) error {
for _, f := range s.afterUpdate {
err := f(key, obj, txC)
if err != nil {
return err
}
}
return nil
}
// runAfterDelete executes functions registered to run after delete event
func (s *Store) runAfterDelete(key string, obj any, txC transaction.Client) error {
for _, f := range s.afterDelete {
err := f(key, obj, txC)
if err != nil {
return err
}
}
return nil
}
// runAfterDeleteAll executes functions registered to run after delete events when
// the database is being replaced.
func (s *Store) runAfterDeleteAll(txC transaction.Client) error {
for _, f := range s.afterDeleteAll {
err := f(txC)
if err != nil {
return err
}
}
return nil
}