mirror of
https://github.com/rancher/steve.git
synced 2025-09-05 01:12:09 +00:00
Keep track of events for SQL cache watches (#661)
* Keep track of past events * Clarify RV acronym * Fix comment typo * Rename listEvents variables * Improve filter readability * Move defer
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
package informer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
@@ -33,9 +35,15 @@ type ListOptionIndexer struct {
|
||||
namespaced bool
|
||||
indexedFields []string
|
||||
|
||||
latestRVLock sync.RWMutex
|
||||
latestRV string
|
||||
|
||||
watchersLock sync.RWMutex
|
||||
watchers map[*watchKey]*watcher
|
||||
|
||||
upsertEventsQuery string
|
||||
findEventsRowByRVQuery string
|
||||
listEventsAfterQuery string
|
||||
addFieldsQuery string
|
||||
deleteFieldsByKeyQuery string
|
||||
deleteFieldsQuery string
|
||||
@@ -43,6 +51,9 @@ type ListOptionIndexer struct {
|
||||
deleteLabelsByKeyQuery string
|
||||
deleteLabelsQuery string
|
||||
|
||||
upsertEventsStmt *sql.Stmt
|
||||
findEventsRowByRVStmt *sql.Stmt
|
||||
listEventsAfterStmt *sql.Stmt
|
||||
addFieldsStmt *sql.Stmt
|
||||
deleteFieldsByKeyStmt *sql.Stmt
|
||||
deleteFieldsStmt *sql.Stmt
|
||||
@@ -63,7 +74,24 @@ const (
|
||||
matchFmt = `%%%s%%`
|
||||
strictMatchFmt = `%s`
|
||||
escapeBackslashDirective = ` ESCAPE '\'` // The leading space is crucial for unit tests only '
|
||||
createFieldsTableFmt = `CREATE TABLE "%s_fields" (
|
||||
|
||||
// RV stands for ResourceVersion
|
||||
createEventsTableFmt = `CREATE TABLE "%s_events" (
|
||||
rv TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
event BLOB NOT NULL,
|
||||
PRIMARY KEY (type, rv)
|
||||
)`
|
||||
listEventsAfterFmt = `SELECT type, rv, event
|
||||
FROM "%s_events"
|
||||
WHERE rowid > ?
|
||||
`
|
||||
findEventsRowByRVFmt = `SELECT rowid
|
||||
FROM "%s_events"
|
||||
WHERE rv = ?
|
||||
`
|
||||
|
||||
createFieldsTableFmt = `CREATE TABLE "%s_fields" (
|
||||
key TEXT NOT NULL PRIMARY KEY,
|
||||
%s
|
||||
)`
|
||||
@@ -138,6 +166,12 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names
|
||||
setStatements := make([]string, len(indexedFields))
|
||||
|
||||
err = l.WithTransaction(ctx, true, func(tx transaction.Client) error {
|
||||
createEventsTableQuery := fmt.Sprintf(createEventsTableFmt, dbName)
|
||||
_, err = tx.Exec(createEventsTableQuery)
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: createEventsTableFmt, Err: err}
|
||||
}
|
||||
|
||||
_, err = tx.Exec(fmt.Sprintf(createFieldsTableFmt, dbName, strings.Join(columnDefs, ", ")))
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -179,6 +213,18 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l.upsertEventsQuery = fmt.Sprintf(
|
||||
`REPLACE INTO "%s_events"(rv, type, event) VALUES (?, ?, ?)`,
|
||||
dbName,
|
||||
)
|
||||
l.upsertEventsStmt = l.Prepare(l.upsertEventsQuery)
|
||||
|
||||
l.listEventsAfterQuery = fmt.Sprintf(listEventsAfterFmt, dbName)
|
||||
l.listEventsAfterStmt = l.Prepare(l.listEventsAfterQuery)
|
||||
|
||||
l.findEventsRowByRVQuery = fmt.Sprintf(findEventsRowByRVFmt, dbName)
|
||||
l.findEventsRowByRVStmt = l.Prepare(l.findEventsRowByRVQuery)
|
||||
|
||||
l.addFieldsQuery = fmt.Sprintf(
|
||||
`INSERT INTO "%s_fields"(key, %s) VALUES (?, %s) ON CONFLICT DO UPDATE SET %s`,
|
||||
dbName,
|
||||
@@ -204,10 +250,95 @@ func NewListOptionIndexer(ctx context.Context, fields [][]string, s Store, names
|
||||
}
|
||||
|
||||
func (l *ListOptionIndexer) Watch(ctx context.Context, opts WatchOptions, eventsCh chan<- watch.Event) error {
|
||||
key := l.addWatcher(eventsCh, opts.Filter)
|
||||
l.latestRVLock.RLock()
|
||||
latestRV := l.latestRV
|
||||
l.latestRVLock.RUnlock()
|
||||
|
||||
targetRV := opts.ResourceVersion
|
||||
if opts.ResourceVersion == "" {
|
||||
targetRV = latestRV
|
||||
}
|
||||
|
||||
var events []watch.Event
|
||||
var key *watchKey
|
||||
// Even though we're not writing in this transaction, we prevent other writes to SQL
|
||||
// because we don't want to add more events while we're backfilling events, so we don't miss events
|
||||
err := l.WithTransaction(ctx, true, func(tx transaction.Client) error {
|
||||
rowIDRows, err := tx.Stmt(l.findEventsRowByRVStmt).QueryContext(ctx, targetRV)
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: l.listEventsAfterQuery, Err: err}
|
||||
}
|
||||
if !rowIDRows.Next() && targetRV != latestRV {
|
||||
return fmt.Errorf("resourceversion too old")
|
||||
}
|
||||
|
||||
var rowID int
|
||||
rowIDRows.Scan(&rowID)
|
||||
|
||||
// Backfilling previous events from resourceVersion
|
||||
rows, err := tx.Stmt(l.listEventsAfterStmt).QueryContext(ctx, rowID)
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: l.listEventsAfterQuery, Err: err}
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var typ, rv string
|
||||
var buf sql.RawBytes
|
||||
err := rows.Scan(&typ, &rv, &buf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("scanning event row: %w", err)
|
||||
}
|
||||
|
||||
example := &unstructured.Unstructured{}
|
||||
val, err := fromBytes(buf, reflect.TypeOf(example))
|
||||
if err != nil {
|
||||
return fmt.Errorf("decoding event object: %w", err)
|
||||
}
|
||||
|
||||
obj, ok := val.Elem().Interface().(runtime.Object)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
filter := opts.Filter
|
||||
if !matchFilter(filter.ID, filter.Namespace, filter.Selector, obj) {
|
||||
continue
|
||||
}
|
||||
|
||||
events = append(events, watch.Event{
|
||||
Type: watch.EventType(typ),
|
||||
Object: val.Elem().Interface().(runtime.Object),
|
||||
})
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
eventsCh <- event
|
||||
}
|
||||
|
||||
key = l.addWatcher(eventsCh, opts.Filter)
|
||||
return nil
|
||||
})
|
||||
<-ctx.Done()
|
||||
l.removeWatcher(key)
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func toBytes(obj any) []byte {
|
||||
var buf bytes.Buffer
|
||||
enc := gob.NewEncoder(&buf)
|
||||
err := enc.Encode(obj)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("error while gobbing object: %w", err))
|
||||
}
|
||||
bb := buf.Bytes()
|
||||
return bb
|
||||
}
|
||||
|
||||
func fromBytes(buf sql.RawBytes, typ reflect.Type) (reflect.Value, error) {
|
||||
dec := gob.NewDecoder(bytes.NewReader(buf))
|
||||
singleResult := reflect.New(typ)
|
||||
err := dec.DecodeValue(singleResult)
|
||||
return singleResult, err
|
||||
}
|
||||
|
||||
type watchKey struct {
|
||||
@@ -268,6 +399,24 @@ func (l *ListOptionIndexer) notifyEventDeleted(key string, obj any, tx transacti
|
||||
}
|
||||
|
||||
func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, obj any, tx transaction.Client) error {
|
||||
acc, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
latestRV := acc.GetResourceVersion()
|
||||
// Append a -d suffix because the RV might be the same as the previous object
|
||||
// in the following case:
|
||||
// - Add obj1 with RV 100
|
||||
// - Delete obj1 with RV 100
|
||||
if eventType == watch.Deleted {
|
||||
latestRV = latestRV + "-d"
|
||||
}
|
||||
_, err = tx.Stmt(l.upsertEventsStmt).Exec(latestRV, eventType, toBytes(obj))
|
||||
if err != nil {
|
||||
return &db.QueryError{QueryString: l.upsertEventsQuery, Err: err}
|
||||
}
|
||||
|
||||
l.watchersLock.RLock()
|
||||
for _, watcher := range l.watchers {
|
||||
if !matchWatch(watcher.filter.ID, watcher.filter.Namespace, watcher.filter.Selector, oldObj, obj) {
|
||||
@@ -280,6 +429,10 @@ func (l *ListOptionIndexer) notifyEvent(eventType watch.EventType, oldObj any, o
|
||||
}
|
||||
}
|
||||
l.watchersLock.RUnlock()
|
||||
|
||||
l.latestRVLock.Lock()
|
||||
defer l.latestRVLock.Unlock()
|
||||
l.latestRV = latestRV
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -634,7 +787,11 @@ func (l *ListOptionIndexer) executeQuery(ctx context.Context, queryInfo *QueryIn
|
||||
continueToken = fmt.Sprintf("%d", offset+limit)
|
||||
}
|
||||
|
||||
return toUnstructuredList(items), total, continueToken, nil
|
||||
l.latestRVLock.RLock()
|
||||
latestRV := l.latestRV
|
||||
l.latestRVLock.RUnlock()
|
||||
|
||||
return toUnstructuredList(items, latestRV), total, continueToken, nil
|
||||
}
|
||||
|
||||
func (l *ListOptionIndexer) validateColumn(column string) error {
|
||||
@@ -1063,12 +1220,15 @@ func isLabelsFieldList(fields []string) bool {
|
||||
}
|
||||
|
||||
// toUnstructuredList turns a slice of unstructured objects into an unstructured.UnstructuredList
|
||||
func toUnstructuredList(items []any) *unstructured.UnstructuredList {
|
||||
func toUnstructuredList(items []any, resourceVersion string) *unstructured.UnstructuredList {
|
||||
objectItems := make([]any, len(items))
|
||||
result := &unstructured.UnstructuredList{
|
||||
Items: make([]unstructured.Unstructured, len(items)),
|
||||
Object: map[string]interface{}{"items": objectItems},
|
||||
}
|
||||
if resourceVersion != "" {
|
||||
result.SetResourceVersion(resourceVersion)
|
||||
}
|
||||
for i, item := range items {
|
||||
result.Items[i] = *item.(*unstructured.Unstructured)
|
||||
objectItems[i] = item.(*unstructured.Unstructured).Object
|
||||
|
Reference in New Issue
Block a user