mirror of
https://github.com/rancher/steve.git
synced 2025-07-17 08:21:41 +00:00
267 lines
8.0 KiB
Go
267 lines
8.0 KiB
Go
package informer
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/rancher/steve/pkg/sqlcache/db"
|
|
"github.com/rancher/steve/pkg/sqlcache/db/transaction"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
const (
|
|
selectQueryFmt = `
|
|
SELECT object, objectnonce, dekid FROM "%[1]s"
|
|
WHERE key IN (
|
|
SELECT key FROM "%[1]s_indices"
|
|
WHERE name = ? AND value IN (?%s)
|
|
)
|
|
`
|
|
createTableFmt = `CREATE TABLE IF NOT EXISTS "%[1]s_indices" (
|
|
name TEXT NOT NULL,
|
|
value TEXT NOT NULL,
|
|
key TEXT NOT NULL REFERENCES "%[1]s"(key) ON DELETE CASCADE,
|
|
PRIMARY KEY (name, value, key)
|
|
)`
|
|
createIndexFmt = `CREATE INDEX IF NOT EXISTS "%[1]s_indices_index" ON "%[1]s_indices"(name, value)`
|
|
|
|
deleteIndicesFmt = `DELETE FROM "%s_indices" WHERE key = ?`
|
|
addIndexFmt = `INSERT INTO "%s_indices" (name, value, key) VALUES (?, ?, ?) ON CONFLICT DO NOTHING`
|
|
listByIndexFmt = `SELECT object, objectnonce, dekid FROM "%[1]s"
|
|
WHERE key IN (
|
|
SELECT key FROM "%[1]s_indices"
|
|
WHERE name = ? AND value = ?
|
|
)`
|
|
listKeyByIndexFmt = `SELECT DISTINCT key FROM "%s_indices" WHERE name = ? AND value = ?`
|
|
listIndexValuesFmt = `SELECT DISTINCT value FROM "%s_indices" WHERE name = ?`
|
|
)
|
|
|
|
// Indexer is a SQLite-backed cache.Indexer which builds upon Store adding an index table
|
|
type Indexer struct {
|
|
ctx context.Context
|
|
|
|
Store
|
|
indexers cache.Indexers
|
|
indexersLock sync.RWMutex
|
|
|
|
deleteIndicesQuery string
|
|
addIndexQuery string
|
|
listByIndexQuery string
|
|
listKeysByIndexQuery string
|
|
listIndexValuesQuery string
|
|
|
|
deleteIndicesStmt *sql.Stmt
|
|
addIndexStmt *sql.Stmt
|
|
listByIndexStmt *sql.Stmt
|
|
listKeysByIndexStmt *sql.Stmt
|
|
listIndexValuesStmt *sql.Stmt
|
|
}
|
|
|
|
var _ cache.Indexer = (*Indexer)(nil)
|
|
|
|
type Store interface {
|
|
db.Client
|
|
cache.Store
|
|
|
|
GetByKey(key string) (item any, exists bool, err error)
|
|
GetName() string
|
|
RegisterAfterAdd(f func(key string, obj any, tx transaction.Client) error)
|
|
RegisterAfterUpdate(f func(key string, obj any, tx transaction.Client) error)
|
|
RegisterAfterDelete(f func(key string, obj any, tx transaction.Client) error)
|
|
RegisterAfterDeleteAll(f func(tx transaction.Client) error)
|
|
GetShouldEncrypt() bool
|
|
GetType() reflect.Type
|
|
}
|
|
|
|
// NewIndexer returns a cache.Indexer backed by SQLite for objects of the given example type
|
|
func NewIndexer(ctx context.Context, indexers cache.Indexers, s Store) (*Indexer, error) {
|
|
dbName := db.Sanitize(s.GetName())
|
|
|
|
err := s.WithTransaction(ctx, true, func(tx transaction.Client) error {
|
|
createTableQuery := fmt.Sprintf(createTableFmt, dbName)
|
|
_, err := tx.Exec(createTableQuery)
|
|
if err != nil {
|
|
return &db.QueryError{QueryString: createTableQuery, Err: err}
|
|
}
|
|
createIndexQuery := fmt.Sprintf(createIndexFmt, dbName)
|
|
_, err = tx.Exec(createIndexQuery)
|
|
if err != nil {
|
|
return &db.QueryError{QueryString: createIndexQuery, Err: err}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
i := &Indexer{
|
|
ctx: ctx,
|
|
Store: s,
|
|
indexers: indexers,
|
|
}
|
|
i.RegisterAfterAdd(i.AfterUpsert)
|
|
i.RegisterAfterUpdate(i.AfterUpsert)
|
|
|
|
i.deleteIndicesQuery = fmt.Sprintf(deleteIndicesFmt, db.Sanitize(s.GetName()))
|
|
i.addIndexQuery = fmt.Sprintf(addIndexFmt, db.Sanitize(s.GetName()))
|
|
i.listByIndexQuery = fmt.Sprintf(listByIndexFmt, db.Sanitize(s.GetName()))
|
|
i.listKeysByIndexQuery = fmt.Sprintf(listKeyByIndexFmt, db.Sanitize(s.GetName()))
|
|
i.listIndexValuesQuery = fmt.Sprintf(listIndexValuesFmt, db.Sanitize(s.GetName()))
|
|
|
|
i.deleteIndicesStmt = s.Prepare(i.deleteIndicesQuery)
|
|
i.addIndexStmt = s.Prepare(i.addIndexQuery)
|
|
i.listByIndexStmt = s.Prepare(i.listByIndexQuery)
|
|
i.listKeysByIndexStmt = s.Prepare(i.listKeysByIndexQuery)
|
|
i.listIndexValuesStmt = s.Prepare(i.listIndexValuesQuery)
|
|
|
|
return i, nil
|
|
}
|
|
|
|
/* Core methods */
|
|
|
|
// AfterUpsert updates indices of an object
|
|
func (i *Indexer) AfterUpsert(key string, obj any, tx transaction.Client) error {
|
|
// delete all
|
|
_, err := tx.Stmt(i.deleteIndicesStmt).Exec(key)
|
|
if err != nil {
|
|
return &db.QueryError{QueryString: i.deleteIndicesQuery, Err: err}
|
|
}
|
|
|
|
// re-insert all values
|
|
i.indexersLock.RLock()
|
|
defer i.indexersLock.RUnlock()
|
|
for indexName, indexFunc := range i.indexers {
|
|
values, err := indexFunc(obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, value := range values {
|
|
_, err = tx.Stmt(i.addIndexStmt).Exec(indexName, value, key)
|
|
if err != nil {
|
|
return &db.QueryError{QueryString: i.addIndexQuery, Err: err}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/* Satisfy cache.Indexer */
|
|
|
|
// Index returns a list of items that match the given object on the index function
|
|
func (i *Indexer) Index(indexName string, obj any) (result []any, err error) {
|
|
i.indexersLock.RLock()
|
|
defer i.indexersLock.RUnlock()
|
|
indexFunc := i.indexers[indexName]
|
|
if indexFunc == nil {
|
|
return nil, fmt.Errorf("index with name %s does not exist", indexName)
|
|
}
|
|
|
|
values, err := indexFunc(obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(values) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// typical case
|
|
if len(values) == 1 {
|
|
return i.ByIndex(indexName, values[0])
|
|
}
|
|
|
|
// atypical case - more than one value to lookup
|
|
// HACK: sql.Statement.Query does not allow to pass slices in as of go 1.19 - create an ad-hoc statement
|
|
query := fmt.Sprintf(selectQueryFmt, db.Sanitize(i.GetName()), strings.Repeat(", ?", len(values)-1))
|
|
stmt := i.Prepare(query)
|
|
|
|
defer func() {
|
|
cerr := i.CloseStmt(stmt)
|
|
if cerr != nil {
|
|
err = errors.Join(err, &db.QueryError{QueryString: query, Err: cerr})
|
|
}
|
|
}()
|
|
// HACK: Query will accept []any but not []string
|
|
params := []any{indexName}
|
|
for _, value := range values {
|
|
params = append(params, value)
|
|
}
|
|
|
|
rows, err := i.QueryForRows(i.ctx, stmt, params...)
|
|
if err != nil {
|
|
return nil, &db.QueryError{QueryString: query, Err: err}
|
|
}
|
|
return i.ReadObjects(rows, i.GetType(), i.GetShouldEncrypt())
|
|
}
|
|
|
|
// ByIndex returns the stored objects whose set of indexed values
|
|
// for the named index includes the given indexed value
|
|
func (i *Indexer) ByIndex(indexName, indexedValue string) ([]any, error) {
|
|
rows, err := i.QueryForRows(i.ctx, i.listByIndexStmt, indexName, indexedValue)
|
|
if err != nil {
|
|
return nil, &db.QueryError{QueryString: i.listByIndexQuery, Err: err}
|
|
}
|
|
return i.ReadObjects(rows, i.GetType(), i.GetShouldEncrypt())
|
|
}
|
|
|
|
// IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value
|
|
func (i *Indexer) IndexKeys(indexName, indexedValue string) ([]string, error) {
|
|
i.indexersLock.RLock()
|
|
defer i.indexersLock.RUnlock()
|
|
indexFunc := i.indexers[indexName]
|
|
if indexFunc == nil {
|
|
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
|
|
}
|
|
|
|
rows, err := i.QueryForRows(i.ctx, i.listKeysByIndexStmt, indexName, indexedValue)
|
|
if err != nil {
|
|
return nil, &db.QueryError{QueryString: i.listKeysByIndexQuery, Err: err}
|
|
}
|
|
return i.ReadStrings(rows)
|
|
}
|
|
|
|
// ListIndexFuncValues wraps safeListIndexFuncValues and panics in case of I/O errors
|
|
func (i *Indexer) ListIndexFuncValues(name string) []string {
|
|
result, err := i.safeListIndexFuncValues(name)
|
|
if err != nil {
|
|
panic(fmt.Errorf("unexpected error in safeListIndexFuncValues: %w", err))
|
|
}
|
|
return result
|
|
}
|
|
|
|
// safeListIndexFuncValues returns all the indexed values of the given index
|
|
func (i *Indexer) safeListIndexFuncValues(indexName string) ([]string, error) {
|
|
rows, err := i.QueryForRows(i.ctx, i.listIndexValuesStmt, indexName)
|
|
if err != nil {
|
|
return nil, &db.QueryError{QueryString: i.listIndexValuesQuery, Err: err}
|
|
}
|
|
return i.ReadStrings(rows)
|
|
}
|
|
|
|
// GetIndexers returns the indexers
|
|
func (i *Indexer) GetIndexers() cache.Indexers {
|
|
i.indexersLock.RLock()
|
|
defer i.indexersLock.RUnlock()
|
|
return i.indexers
|
|
}
|
|
|
|
// AddIndexers adds more indexers to this Store. If you call this after you already have data
|
|
// in the Store, the results are undefined.
|
|
func (i *Indexer) AddIndexers(newIndexers cache.Indexers) error {
|
|
i.indexersLock.Lock()
|
|
defer i.indexersLock.Unlock()
|
|
if i.indexers == nil {
|
|
i.indexers = make(map[string]cache.IndexFunc)
|
|
}
|
|
for k, v := range newIndexers {
|
|
i.indexers[k] = v
|
|
}
|
|
return nil
|
|
}
|