1
0
mirror of https://github.com/rancher/steve.git synced 2025-09-17 07:48:52 +00:00
Files
steve/pkg/sqlcache/informer/informer.go

159 lines
5.8 KiB
Go
Raw Normal View History

/*
package sql provides an Informer and Indexer that uses SQLite as a store, instead of an in-memory store like a map.
*/
package informer
import (
"context"
"errors"
"sort"
"strconv"
"time"
"github.com/rancher/steve/pkg/sqlcache/db"
"github.com/rancher/steve/pkg/sqlcache/partition"
"github.com/rancher/steve/pkg/sqlcache/sqltypes"
sqlStore "github.com/rancher/steve/pkg/sqlcache/store"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)
var defaultRefreshTime = 5 * time.Second
// Informer is a SQLite-backed cache.SharedIndexInformer that can execute queries on listprocessor structs
type Informer struct {
cache.SharedIndexInformer
ByOptionsLister
}
type WatchOptions struct {
ResourceVersion string
Filter WatchFilter
}
type WatchFilter struct {
ID string
Selector labels.Selector
Namespace string
}
type ByOptionsLister interface {
ListByOptions(ctx context.Context, lo *sqltypes.ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error)
Watch(ctx context.Context, options WatchOptions, eventsCh chan<- watch.Event) error
GetLatestResourceVersion() []string
}
// this is set to a var so that it can be overridden by test code for mocking purposes
var newInformer = cache.NewSharedIndexInformer
// NewInformer returns a new SQLite-backed Informer for the type specified by schema in unstructured.Unstructured form
// using the specified client
func NewInformer(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*Informer, error) {
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
return client.Watch(ctx, options)
}
if !watchable {
watchFunc = func(options metav1.ListOptions) (watch.Interface, error) {
ctx, cancel := context.WithCancel(ctx)
return newSyntheticWatcher(ctx, cancel).watch(client, options, defaultRefreshTime)
}
}
listWatcher := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
a, err := client.List(ctx, options)
if err != nil {
return nil, err
}
// We want the list to be consistent when there are going to be relists
sort.SliceStable(a.Items, func(i int, j int) bool {
var err error
rvI, err1 := strconv.Atoi(a.Items[i].GetResourceVersion())
err = errors.Join(err, err1)
rvJ, err2 := strconv.Atoi(a.Items[j].GetResourceVersion())
err = errors.Join(err, err2)
if err != nil {
logrus.Debug("ResourceVersion not a number, falling back to string comparison")
return a.Items[i].GetResourceVersion() < a.Items[j].GetResourceVersion()
}
return rvI < rvJ
})
return a, err
},
WatchFunc: watchFunc,
}
example := &unstructured.Unstructured{}
example.SetGroupVersionKind(gvk)
2025-01-28 09:02:38 +01:00
// TL;DR: this disables the Informer periodic resync - but this is inconsequential
//
// Long version: Informers use a Reflector to pull data from a ListWatcher and push it into a DeltaFIFO.
// Concurrently, they pop data off the DeltaFIFO to fire registered handlers, and also to keep an updated
// copy of the known state of all objects (in an Indexer).
// The resync period option here is passed from Informer to Reflector to periodically (re)-push all known
// objects to the DeltaFIFO. That causes the periodic (re-)firing all registered handlers.
// In this case we are not registering any handlers to this particular informer, so re-syncing is a no-op.
2025-01-28 09:02:38 +01:00
// We therefore just disable it right away.
resyncPeriod := time.Duration(0)
sii := newInformer(listWatcher, example, resyncPeriod, cache.Indexers{})
if transform != nil {
if err := sii.SetTransform(transform); err != nil {
return nil, err
}
}
name := informerNameFromGVK(gvk)
Hard-wire external associations: 5/7: update A=>B links when instances of A change (#646) * Continue rebasing. * Wrote unit tests for external associations. * Fix the generated SQL. Some syntactic sugar (capitalizing the keywords), but use the 'ON' syntax on JOINs. * We want "management.cattle.io.projects:spec.displayName" not "...spec.clusterName" * Implement hard-wired external associations: * The table is in sqlproxy.proxy_store - externalGVKDependencies - a map of GVKs to dependencies. When the key GVK is updated, it triggers the updates in the database for the dependent GVKs, replacing fields as specified in the table. * This is done in an afterUpsert handler, but it's done after the transaction for the core GVK update is finished, because most likely the dependent GVK updates will depend on the final database values for the GVK being updated, and if we do it as part of the transaction the new values won't be committed to the database. * When an object is modified/created, check for external deps that need updating. * Stop emitting errors when joining tables if one of the tables doesn't exist. * Update unit test syntax for SQL queries. * And an override check This ensures we don't overwrite good data when pulling data from one table to another. * Drop labels, and use mgmt.cattle.io/spec.displayName There's no need to hardwire labels in proxy_store:typeSpecificIndexedFields because all labels are indexed in the shadow labels table. * Keep clusterName, add displayName for mgmt.cattle.io * Fix rebase/merge breakage. * Finish the merge: add the 'selfUpdateInfo' param where it didn't get inserted during merge. * Patch up rebase failures. * Now gomock generates named args. I give up.
2025-07-03 14:35:09 -07:00
s, err := sqlStore.NewStore(ctx, example, cache.DeletionHandlingMetaNamespaceKeyFunc, db, shouldEncrypt, gvk, name, externalUpdateInfo, selfUpdateInfo)
if err != nil {
return nil, err
}
opts := ListOptionIndexerOptions{
Fields: fields,
IsNamespaced: namespaced,
GCInterval: gcInterval,
GCKeepCount: gcKeepCount,
}
loi, err := NewListOptionIndexer(ctx, s, opts)
if err != nil {
return nil, err
}
// HACK: replace the default informer's indexer with the SQL based one
UnsafeSet(sii, "indexer", loi)
return &Informer{
SharedIndexInformer: sii,
ByOptionsLister: loi,
}, nil
}
// ListByOptions returns objects according to the specified list options and partitions.
// Specifically:
// - an unstructured list of resources belonging to any of the specified partitions
// - the total number of resources (returned list might be a subset depending on pagination options in lo)
// - a continue token, if there are more pages after the returned one
// - an error instead of all of the above if anything went wrong
func (i *Informer) ListByOptions(ctx context.Context, lo *sqltypes.ListOptions, partitions []partition.Partition, namespace string) (*unstructured.UnstructuredList, int, string, error) {
return i.ByOptionsLister.ListByOptions(ctx, lo, partitions, namespace)
}
// SetSyntheticWatchableInterval - call this function to override the default interval time of 5 seconds
func SetSyntheticWatchableInterval(interval time.Duration) {
defaultRefreshTime = interval
}
func informerNameFromGVK(gvk schema.GroupVersionKind) string {
return gvk.Group + "_" + gvk.Version + "_" + gvk.Kind
}