mirror of
https://github.com/rancher/steve.git
synced 2025-09-23 12:29:09 +00:00
Stop single caches instead of all of them (#812)
* Revert OnSchemas change work * Track schema changes * Only stop a single GVK informer factory * Add tests * Rename crd to crdClient * Rename s to sqlStore * Don't wait for synced caches if request is canceled * Move schematracker to pkg/sqlcache/schematracker
This commit is contained in:
@@ -19,7 +19,6 @@ import (
|
|||||||
authorizationv1 "k8s.io/api/authorization/v1"
|
authorizationv1 "k8s.io/api/authorization/v1"
|
||||||
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
k8sapimachineryschema "k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/client-go/discovery"
|
"k8s.io/client-go/discovery"
|
||||||
authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1"
|
authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1"
|
||||||
apiv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
apiv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||||
@@ -32,18 +31,15 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
type SchemasHandlerFunc func(schemas *schema2.Collection, changedSchemas map[string]*types.APISchema, deletedSomething bool) error
|
type SchemasHandlerFunc func(schemas *schema2.Collection) error
|
||||||
|
|
||||||
func (s SchemasHandlerFunc) OnSchemas(schemas *schema2.Collection, changedSchemas map[string]*types.APISchema, forceChange bool) error {
|
func (s SchemasHandlerFunc) OnSchemas(schemas *schema2.Collection) error {
|
||||||
return s(schemas, changedSchemas, forceChange)
|
return s(schemas)
|
||||||
}
|
}
|
||||||
|
|
||||||
type handler struct {
|
type handler struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
// refreshLock prevents refreshAll to be run in parallel
|
|
||||||
refreshLock sync.Mutex
|
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
toSync int32
|
toSync int32
|
||||||
schemas *schema2.Collection
|
schemas *schema2.Collection
|
||||||
@@ -52,11 +48,6 @@ type handler struct {
|
|||||||
crdClient apiextcontrollerv1.CustomResourceDefinitionClient
|
crdClient apiextcontrollerv1.CustomResourceDefinitionClient
|
||||||
ssar authorizationv1client.SelfSubjectAccessReviewInterface
|
ssar authorizationv1client.SelfSubjectAccessReviewInterface
|
||||||
handler SchemasHandlerFunc
|
handler SchemasHandlerFunc
|
||||||
changedIDs map[k8sapimachineryschema.GroupVersionKind]bool
|
|
||||||
createdCRDs map[k8sapimachineryschema.GroupVersionKind]bool
|
|
||||||
deletedCRDs map[k8sapimachineryschema.GroupVersionKind]bool
|
|
||||||
apiServiceChanged bool
|
|
||||||
gvksFromKeys map[string][]k8sapimachineryschema.GroupVersionKind
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Register(ctx context.Context,
|
func Register(ctx context.Context,
|
||||||
@@ -76,58 +67,18 @@ func Register(ctx context.Context,
|
|||||||
handler: schemasHandler,
|
handler: schemasHandler,
|
||||||
crdClient: crd,
|
crdClient: crd,
|
||||||
ssar: ssar,
|
ssar: ssar,
|
||||||
changedIDs: make(map[k8sapimachineryschema.GroupVersionKind]bool),
|
|
||||||
gvksFromKeys: make(map[string][]k8sapimachineryschema.GroupVersionKind),
|
|
||||||
createdCRDs: make(map[k8sapimachineryschema.GroupVersionKind]bool),
|
|
||||||
deletedCRDs: make(map[k8sapimachineryschema.GroupVersionKind]bool),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
apiService.OnChange(ctx, "schema", h.OnChangeAPIService)
|
apiService.OnChange(ctx, "schema", h.OnChangeAPIService)
|
||||||
crd.OnChange(ctx, "schema", h.OnChangeCRD)
|
crd.OnChange(ctx, "schema", h.OnChangeCRD)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) handleDeletedCRD(key string, crd *apiextv1.CustomResourceDefinition) {
|
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
gvkList, ok := h.gvksFromKeys[key]
|
|
||||||
if !ok {
|
|
||||||
logrus.Infof("No associated GVK for CRD key %s", key)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, gvk := range gvkList {
|
|
||||||
h.deletedCRDs[gvk] = true
|
|
||||||
}
|
|
||||||
delete(h.gvksFromKeys, key) // Don't need this anymore
|
|
||||||
h.queueRefresh()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *handler) OnChangeCRD(key string, crd *apiextv1.CustomResourceDefinition) (*apiextv1.CustomResourceDefinition, error) {
|
func (h *handler) OnChangeCRD(key string, crd *apiextv1.CustomResourceDefinition) (*apiextv1.CustomResourceDefinition, error) {
|
||||||
if crd == nil {
|
|
||||||
h.handleDeletedCRD(key, crd)
|
|
||||||
return crd, nil
|
|
||||||
}
|
|
||||||
spec := crd.Spec
|
|
||||||
group := spec.Group
|
|
||||||
kind := spec.Names.Kind
|
|
||||||
gvkList := make([]k8sapimachineryschema.GroupVersionKind, len(spec.Versions))
|
|
||||||
h.Lock()
|
|
||||||
defer h.Unlock()
|
|
||||||
for i, version := range spec.Versions {
|
|
||||||
gvk := k8sapimachineryschema.GroupVersionKind{Group: group, Version: version.Name, Kind: kind}
|
|
||||||
gvkList[i] = gvk
|
|
||||||
h.changedIDs[gvk] = true
|
|
||||||
_, ok := h.gvksFromKeys[key]
|
|
||||||
if !ok {
|
|
||||||
h.createdCRDs[gvk] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
h.gvksFromKeys[key] = gvkList
|
|
||||||
h.queueRefresh()
|
h.queueRefresh()
|
||||||
return crd, nil
|
return crd, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) OnChangeAPIService(key string, api *apiv1.APIService) (*apiv1.APIService, error) {
|
func (h *handler) OnChangeAPIService(key string, api *apiv1.APIService) (*apiv1.APIService, error) {
|
||||||
h.apiServiceChanged = true
|
|
||||||
h.queueRefresh()
|
h.queueRefresh()
|
||||||
return api, nil
|
return api, nil
|
||||||
}
|
}
|
||||||
@@ -137,34 +88,7 @@ func (h *handler) queueRefresh() {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
var err error
|
if err := h.refreshAll(h.ctx); err != nil {
|
||||||
var changedIDs map[k8sapimachineryschema.GroupVersionKind]bool
|
|
||||||
var deletedCRDs map[k8sapimachineryschema.GroupVersionKind]bool
|
|
||||||
var createdCRDs map[k8sapimachineryschema.GroupVersionKind]bool
|
|
||||||
var apiServiceChanged bool
|
|
||||||
h.Lock()
|
|
||||||
if len(h.createdCRDs) > 0 {
|
|
||||||
createdCRDs = h.createdCRDs
|
|
||||||
h.createdCRDs = make(map[k8sapimachineryschema.GroupVersionKind]bool)
|
|
||||||
}
|
|
||||||
if len(h.deletedCRDs) > 0 {
|
|
||||||
deletedCRDs = h.deletedCRDs
|
|
||||||
h.deletedCRDs = make(map[k8sapimachineryschema.GroupVersionKind]bool)
|
|
||||||
}
|
|
||||||
if len(h.changedIDs) > 0 {
|
|
||||||
changedIDs = h.changedIDs
|
|
||||||
h.changedIDs = make(map[k8sapimachineryschema.GroupVersionKind]bool)
|
|
||||||
}
|
|
||||||
if h.apiServiceChanged {
|
|
||||||
apiServiceChanged = true
|
|
||||||
h.apiServiceChanged = false
|
|
||||||
}
|
|
||||||
h.Unlock()
|
|
||||||
crdNumCountChanged := len(deletedCRDs) > 0 || len(createdCRDs) > 0
|
|
||||||
if len(changedIDs) > 0 || apiServiceChanged || crdNumCountChanged {
|
|
||||||
err = h.refreshAll(h.ctx, changedIDs, crdNumCountChanged)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("failed to sync schemas: %v", err)
|
logrus.Errorf("failed to sync schemas: %v", err)
|
||||||
atomic.StoreInt32(&h.toSync, 1)
|
atomic.StoreInt32(&h.toSync, 1)
|
||||||
}
|
}
|
||||||
@@ -224,9 +148,9 @@ func (h *handler) getColumns(ctx context.Context, schemas map[string]*types.APIS
|
|||||||
return eg.Wait()
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachineryschema.GroupVersionKind]bool, forceChange bool) error {
|
func (h *handler) refreshAll(ctx context.Context) error {
|
||||||
h.refreshLock.Lock()
|
h.Lock()
|
||||||
defer h.refreshLock.Unlock()
|
defer h.Unlock()
|
||||||
|
|
||||||
if !h.needToSync() {
|
if !h.needToSync() {
|
||||||
return nil
|
return nil
|
||||||
@@ -238,7 +162,6 @@ func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachiner
|
|||||||
}
|
}
|
||||||
|
|
||||||
filteredSchemas := map[string]*types.APISchema{}
|
filteredSchemas := map[string]*types.APISchema{}
|
||||||
changedSchemasByID := map[string]*types.APISchema{}
|
|
||||||
for _, schema := range schemas {
|
for _, schema := range schemas {
|
||||||
if IsListWatchable(schema) {
|
if IsListWatchable(schema) {
|
||||||
if preferredTypeExists(schema, schemas) {
|
if preferredTypeExists(schema, schemas) {
|
||||||
@@ -258,10 +181,6 @@ func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachiner
|
|||||||
schema.PluralName = converter.GVRToPluralName(gvr)
|
schema.PluralName = converter.GVRToPluralName(gvr)
|
||||||
}
|
}
|
||||||
filteredSchemas[schema.ID] = schema
|
filteredSchemas[schema.ID] = schema
|
||||||
if changedGVKs[gvk] {
|
|
||||||
// nil[x] is always false if the first-time runner called this
|
|
||||||
changedSchemasByID[schema.ID] = schema
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := h.getColumns(h.ctx, filteredSchemas); err != nil {
|
if err := h.getColumns(h.ctx, filteredSchemas); err != nil {
|
||||||
@@ -270,7 +189,7 @@ func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachiner
|
|||||||
|
|
||||||
h.schemas.Reset(filteredSchemas)
|
h.schemas.Reset(filteredSchemas)
|
||||||
if h.handler != nil {
|
if h.handler != nil {
|
||||||
return h.handler.OnSchemas(h.schemas, changedSchemasByID, forceChange)
|
return h.handler.OnSchemas(h.schemas)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@@ -5,8 +5,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"slices"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
apiserver "github.com/rancher/apiserver/pkg/server"
|
apiserver "github.com/rancher/apiserver/pkg/server"
|
||||||
"github.com/rancher/apiserver/pkg/types"
|
"github.com/rancher/apiserver/pkg/types"
|
||||||
@@ -26,12 +24,12 @@ import (
|
|||||||
"github.com/rancher/steve/pkg/server/handler"
|
"github.com/rancher/steve/pkg/server/handler"
|
||||||
"github.com/rancher/steve/pkg/server/router"
|
"github.com/rancher/steve/pkg/server/router"
|
||||||
"github.com/rancher/steve/pkg/sqlcache/informer/factory"
|
"github.com/rancher/steve/pkg/sqlcache/informer/factory"
|
||||||
|
"github.com/rancher/steve/pkg/sqlcache/schematracker"
|
||||||
metricsStore "github.com/rancher/steve/pkg/stores/metrics"
|
metricsStore "github.com/rancher/steve/pkg/stores/metrics"
|
||||||
"github.com/rancher/steve/pkg/stores/proxy"
|
"github.com/rancher/steve/pkg/stores/proxy"
|
||||||
"github.com/rancher/steve/pkg/stores/sqlpartition"
|
"github.com/rancher/steve/pkg/stores/sqlpartition"
|
||||||
"github.com/rancher/steve/pkg/stores/sqlproxy"
|
"github.com/rancher/steve/pkg/stores/sqlproxy"
|
||||||
"github.com/rancher/steve/pkg/summarycache"
|
"github.com/rancher/steve/pkg/summarycache"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -235,63 +233,25 @@ func setup(ctx context.Context, server *Server) error {
|
|||||||
for _, template := range resources.DefaultSchemaTemplatesForStore(store, server.BaseSchemas, summaryCache, asl, server.controllers.K8s.Discovery(), common.TemplateOptions{InSQLMode: true}) {
|
for _, template := range resources.DefaultSchemaTemplatesForStore(store, server.BaseSchemas, summaryCache, asl, server.controllers.K8s.Discovery(), common.TemplateOptions{InSQLMode: true}) {
|
||||||
sf.AddTemplate(template)
|
sf.AddTemplate(template)
|
||||||
}
|
}
|
||||||
mutex := &sync.Mutex{}
|
|
||||||
fieldsForSchema := make(map[string][][]string) // map schemaID to fields
|
|
||||||
initializedDB := false
|
|
||||||
|
|
||||||
onSchemasHandler = func(schemas *schema.Collection, changedSchemas map[string]*types.APISchema, deletedSomething bool) error {
|
sqlSchemaTracker := schematracker.NewSchemaTracker(sqlStore)
|
||||||
resetEverything := false
|
|
||||||
// We need a mutex around the fieldsForSchema closure because this handler is invoked asynchronously
|
onSchemasHandler = func(schemas *schema.Collection) error {
|
||||||
// from the server
|
var retErr error
|
||||||
mutex.Lock()
|
|
||||||
if !initializedDB {
|
err := ccache.OnSchemas(schemas)
|
||||||
initializedDB = true
|
retErr = errors.Join(retErr, err)
|
||||||
resetEverything = true
|
|
||||||
for _, id := range schemas.IDs() {
|
err = sqlSchemaTracker.OnSchemas(schemas)
|
||||||
theSchema := schemas.Schema(id)
|
retErr = errors.Join(retErr, err)
|
||||||
if theSchema == nil {
|
|
||||||
fieldsForSchema[id] = [][]string{}
|
return retErr
|
||||||
continue
|
|
||||||
}
|
|
||||||
fieldsForSchema[id] = sqlproxy.GetFieldsFromSchema(theSchema)
|
|
||||||
}
|
|
||||||
logrus.Debugf("onSchemasHandler: need to reset everything on first run")
|
|
||||||
} else {
|
|
||||||
for id, theSchema := range changedSchemas {
|
|
||||||
oldFields, ok := fieldsForSchema[id]
|
|
||||||
newFields := sqlproxy.GetFieldsFromSchema(theSchema)
|
|
||||||
if !ok || !slices.EqualFunc(oldFields, newFields,
|
|
||||||
func(s1, s2 []string) bool {
|
|
||||||
return slices.Equal(s1, s2)
|
|
||||||
}) {
|
|
||||||
resetEverything = true
|
|
||||||
}
|
|
||||||
fieldsForSchema[id] = newFields
|
|
||||||
}
|
|
||||||
if deletedSomething {
|
|
||||||
resetEverything = true
|
|
||||||
}
|
|
||||||
logrus.Debugf("onSchemasHandler: need to reset everything: %t", resetEverything)
|
|
||||||
}
|
|
||||||
mutex.Unlock()
|
|
||||||
if !resetEverything {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err := ccache.OnSchemas(schemas); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := sqlStore.Reset(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for _, template := range resources.DefaultSchemaTemplates(cf, server.BaseSchemas, summaryCache, asl, server.controllers.K8s.Discovery(), server.controllers.Core.Namespace().Cache(), common.TemplateOptions{InSQLMode: false}) {
|
for _, template := range resources.DefaultSchemaTemplates(cf, server.BaseSchemas, summaryCache, asl, server.controllers.K8s.Discovery(), server.controllers.Core.Namespace().Cache(), common.TemplateOptions{InSQLMode: false}) {
|
||||||
sf.AddTemplate(template)
|
sf.AddTemplate(template)
|
||||||
}
|
}
|
||||||
onSchemasHandler = func(schemas *schema.Collection, _ map[string]*types.APISchema, _ bool) error {
|
onSchemasHandler = ccache.OnSchemas
|
||||||
return ccache.OnSchemas(schemas)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
schemas.SetupWatcher(ctx, server.BaseSchemas, asl, sf)
|
schemas.SetupWatcher(ctx, server.BaseSchemas, asl, sf)
|
||||||
|
@@ -28,14 +28,12 @@ const EncryptAllEnvVar = "CATTLE_ENCRYPT_CACHE_ALL"
|
|||||||
|
|
||||||
// CacheFactory builds Informer instances and keeps a cache of instances it created
|
// CacheFactory builds Informer instances and keeps a cache of instances it created
|
||||||
type CacheFactory struct {
|
type CacheFactory struct {
|
||||||
wg wait.Group
|
|
||||||
dbClient db.Client
|
dbClient db.Client
|
||||||
|
|
||||||
// ctx determines when informers need to stop
|
// ctx determines when informers need to stop
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
mutex sync.RWMutex
|
|
||||||
encryptAll bool
|
encryptAll bool
|
||||||
|
|
||||||
gcInterval time.Duration
|
gcInterval time.Duration
|
||||||
@@ -49,13 +47,31 @@ type CacheFactory struct {
|
|||||||
|
|
||||||
type guardedInformer struct {
|
type guardedInformer struct {
|
||||||
informer *informer.Informer
|
informer *informer.Informer
|
||||||
mutex *sync.Mutex
|
// informerMutex ensures informer is only set by one goroutine even if
|
||||||
|
// multiple concurrent calls to CacheFor are made
|
||||||
|
informerMutex *sync.Mutex
|
||||||
|
|
||||||
|
// stopMutex ensures no CacheFor call can be made for a given GVK when
|
||||||
|
// a Stop call is ongoing.
|
||||||
|
//
|
||||||
|
// CacheFactory.informersMutex is not enough because part of the code
|
||||||
|
// might still have an old cache from a previous CacheFor call.
|
||||||
|
stopMutex *sync.RWMutex
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
wg wait.Group
|
||||||
}
|
}
|
||||||
|
|
||||||
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error)
|
type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error)
|
||||||
|
|
||||||
type Cache struct {
|
type Cache struct {
|
||||||
informer.ByOptionsLister
|
informer.ByOptionsLister
|
||||||
|
gvk schema.GroupVersionKind
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache) GVK() schema.GroupVersionKind {
|
||||||
|
return c.gvk
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultEncryptedResourceTypes = map[schema.GroupVersionKind]struct{}{
|
var defaultEncryptedResourceTypes = map[schema.GroupVersionKind]struct{}{
|
||||||
@@ -90,8 +106,6 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) {
|
|||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
return &CacheFactory{
|
return &CacheFactory{
|
||||||
wg: wait.Group{},
|
|
||||||
|
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
|
||||||
@@ -111,17 +125,6 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) {
|
|||||||
//
|
//
|
||||||
// Don't forget to call DoneWithCache with the given informer once done with it.
|
// Don't forget to call DoneWithCache with the given informer once done with it.
|
||||||
func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) {
|
func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) {
|
||||||
// First of all block Reset() until we are done
|
|
||||||
f.mutex.RLock()
|
|
||||||
cache, err := f.cacheForLocked(ctx, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, namespaced, watchable)
|
|
||||||
if err != nil {
|
|
||||||
f.mutex.RUnlock()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return cache, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) {
|
|
||||||
// Second, check if the informer and its accompanying informer-specific mutex exist already in the informers cache
|
// Second, check if the informer and its accompanying informer-specific mutex exist already in the informers cache
|
||||||
// If not, start by creating such informer-specific mutex. That is used later to ensure no two goroutines create
|
// If not, start by creating such informer-specific mutex. That is used later to ensure no two goroutines create
|
||||||
// informers for the same GVK at the same type
|
// informers for the same GVK at the same type
|
||||||
@@ -130,17 +133,32 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, ex
|
|||||||
// that blocks CacheFor for other GVKs, hence not deferring unlock here
|
// that blocks CacheFor for other GVKs, hence not deferring unlock here
|
||||||
gi, ok := f.informers[gvk]
|
gi, ok := f.informers[gvk]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
giCtx, giCancel := context.WithCancel(f.ctx)
|
||||||
gi = &guardedInformer{
|
gi = &guardedInformer{
|
||||||
informer: nil,
|
informer: nil,
|
||||||
mutex: &sync.Mutex{},
|
informerMutex: &sync.Mutex{},
|
||||||
|
stopMutex: &sync.RWMutex{},
|
||||||
|
ctx: giCtx,
|
||||||
|
cancel: giCancel,
|
||||||
}
|
}
|
||||||
f.informers[gvk] = gi
|
f.informers[gvk] = gi
|
||||||
}
|
}
|
||||||
f.informersMutex.Unlock()
|
f.informersMutex.Unlock()
|
||||||
|
|
||||||
|
// Prevent Stop() to be called for that GVK
|
||||||
|
gi.stopMutex.RLock()
|
||||||
|
|
||||||
|
gvkCache, err := f.cacheForLocked(ctx, gi, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, namespaced, watchable)
|
||||||
|
if err != nil {
|
||||||
|
gi.stopMutex.RUnlock()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return gvkCache, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) {
|
||||||
// At this point an informer-specific mutex (gi.mutex) is guaranteed to exist. Lock it
|
// At this point an informer-specific mutex (gi.mutex) is guaranteed to exist. Lock it
|
||||||
gi.mutex.Lock()
|
gi.informerMutex.Lock()
|
||||||
defer gi.mutex.Unlock()
|
|
||||||
|
|
||||||
// Then: if the informer really was not created yet (first time here or previous times have errored out)
|
// Then: if the informer really was not created yet (first time here or previous times have errored out)
|
||||||
// actually create the informer
|
// actually create the informer
|
||||||
@@ -155,8 +173,9 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, ex
|
|||||||
shouldEncrypt := f.encryptAll || encryptResourceAlways
|
shouldEncrypt := f.encryptAll || encryptResourceAlways
|
||||||
// In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer()
|
// In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer()
|
||||||
// search for "func NewInformer(ctx"
|
// search for "func NewInformer(ctx"
|
||||||
i, err := f.newInformer(f.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, f.gcInterval, f.gcKeepCount)
|
i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, f.gcInterval, f.gcKeepCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
gi.informerMutex.Unlock()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -168,69 +187,92 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, ex
|
|||||||
cache.DefaultWatchErrorHandler(ctx, r, err)
|
cache.DefaultWatchErrorHandler(ctx, r, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
gi.informerMutex.Unlock()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
f.wg.StartWithChannel(f.ctx.Done(), i.Run)
|
gi.wg.StartWithChannel(gi.ctx.Done(), i.Run)
|
||||||
|
|
||||||
gi.informer = i
|
gi.informer = i
|
||||||
}
|
}
|
||||||
|
gi.informerMutex.Unlock()
|
||||||
|
|
||||||
if !cache.WaitForCacheSync(f.ctx.Done(), gi.informer.HasSynced) {
|
// We don't want to get stuck in WaitForCachesSync if the request from
|
||||||
|
// the client has been canceled.
|
||||||
|
waitCh := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
close(waitCh)
|
||||||
|
case <-gi.ctx.Done():
|
||||||
|
close(waitCh)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if !cache.WaitForCacheSync(waitCh, gi.informer.HasSynced) {
|
||||||
return nil, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk)
|
return nil, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk)
|
||||||
}
|
}
|
||||||
|
|
||||||
// At this point the informer is ready, return it
|
// At this point the informer is ready, return it
|
||||||
return &Cache{ByOptionsLister: gi.informer}, nil
|
return &Cache{ByOptionsLister: gi.informer, gvk: gvk}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DoneWithCache must be called for every CacheFor call.
|
// DoneWithCache must be called for every successful CacheFor call. The Cache should
|
||||||
|
// no longer be used after DoneWithCache is called.
|
||||||
//
|
//
|
||||||
// This ensures that there aren't any inflight list requests while we are resetting the database.
|
// This ensures that there aren't any inflight list requests while we are resetting the database.
|
||||||
//
|
func (f *CacheFactory) DoneWithCache(cache *Cache) {
|
||||||
// TODO: Use the *Cache once we go per-GVK
|
if cache == nil {
|
||||||
func (f *CacheFactory) DoneWithCache(_ *Cache) {
|
return
|
||||||
f.mutex.RUnlock()
|
}
|
||||||
|
|
||||||
|
f.informersMutex.Lock()
|
||||||
|
defer f.informersMutex.Unlock()
|
||||||
|
|
||||||
|
// Note: the informers cache is protected by informersMutex, which we don't want to hold for very long because
|
||||||
|
// that blocks CacheFor for other GVKs, hence not deferring unlock here
|
||||||
|
gi, ok := f.informers[cache.gvk]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
gi.stopMutex.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets
|
// Stop cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets
|
||||||
// the database connection which wipes any current sqlite database at the default location.
|
// the database connection which wipes any current sqlite database at the default location.
|
||||||
func (f *CacheFactory) Stop() error {
|
func (f *CacheFactory) Stop(gvk schema.GroupVersionKind) error {
|
||||||
if f.dbClient == nil {
|
if f.dbClient == nil {
|
||||||
// nothing to reset
|
// nothing to reset
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We must stop informers here to unblock those stuck in WaitForCacheSync
|
|
||||||
// which is blocking DoneWithCache call.
|
|
||||||
//
|
|
||||||
// This is fine without a lock as long as multiple Stop() call aren't made
|
|
||||||
// concurrently (which they currently aren't)
|
|
||||||
f.cancel()
|
|
||||||
|
|
||||||
// Prevent more CacheFor calls
|
|
||||||
f.mutex.Lock()
|
|
||||||
defer f.mutex.Unlock()
|
|
||||||
|
|
||||||
// Wait for all informers to have exited
|
|
||||||
f.wg.Wait()
|
|
||||||
|
|
||||||
f.ctx, f.cancel = context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
// and get rid of all references to those informers and their mutexes
|
|
||||||
f.informersMutex.Lock()
|
f.informersMutex.Lock()
|
||||||
defer f.informersMutex.Unlock()
|
defer f.informersMutex.Unlock()
|
||||||
|
|
||||||
for gvk, informer := range f.informers {
|
gi, ok := f.informers[gvk]
|
||||||
// DropAll needs its own context because the context from the CacheFactory
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
delete(f.informers, gvk)
|
||||||
|
|
||||||
|
// We must stop informers here to unblock those stuck in WaitForCacheSync
|
||||||
|
// which is blocking DoneWithCache call.
|
||||||
|
gi.cancel()
|
||||||
|
|
||||||
|
// Prevent other CacheFor calls for that GVK
|
||||||
|
gi.stopMutex.Lock()
|
||||||
|
defer gi.stopMutex.Unlock()
|
||||||
|
|
||||||
|
// Wait for all informers to have exited
|
||||||
|
gi.wg.Wait()
|
||||||
|
|
||||||
|
// DropAll needs its own context because the context from the informer
|
||||||
// is canceled
|
// is canceled
|
||||||
err := informer.informer.DropAll(context.Background())
|
err := gi.informer.DropAll(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("dropall %q: %w", gvk, err)
|
return fmt.Errorf("dropall %q: %w", gvk, err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
f.informers = make(map[schema.GroupVersionKind]*guardedInformer)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -79,6 +79,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expectedC := &Cache{
|
expectedC := &Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
|
gvk: expectedGVK,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
@@ -100,7 +101,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
// this function ensures that ctx is open for the duration of this test but if part of a longer process it will be closed eventually
|
// this function ensures that ctx is open for the duration of this test but if part of a longer process it will be closed eventually
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
f.Stop()
|
f.Stop(expectedGVK)
|
||||||
}()
|
}()
|
||||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
@@ -148,13 +149,63 @@ func TestCacheFor(t *testing.T) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
f.Stop()
|
f.Stop(expectedGVK)
|
||||||
}()
|
}()
|
||||||
var err error
|
_, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||||
_, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
}})
|
}})
|
||||||
|
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning false, request is canceled", test: func(t *testing.T) {
|
||||||
|
dbClient := NewMockClient(gomock.NewController(t))
|
||||||
|
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
|
||||||
|
fields := [][]string{{"something"}}
|
||||||
|
expectedGVK := schema.GroupVersionKind{}
|
||||||
|
|
||||||
|
bloi := NewMockByOptionsLister(gomock.NewController(t))
|
||||||
|
bloi.EXPECT().RunGC(gomock.Any()).AnyTimes()
|
||||||
|
bloi.EXPECT().DropAll(gomock.Any()).AnyTimes()
|
||||||
|
sii := NewMockSharedIndexInformer(gomock.NewController(t))
|
||||||
|
sii.EXPECT().HasSynced().Return(false).AnyTimes()
|
||||||
|
sii.EXPECT().Run(gomock.Any())
|
||||||
|
sii.EXPECT().SetWatchErrorHandler(gomock.Any())
|
||||||
|
expectedI := &informer.Informer{
|
||||||
|
// need to set this so Run function is not nil
|
||||||
|
SharedIndexInformer: sii,
|
||||||
|
ByOptionsLister: bloi,
|
||||||
|
}
|
||||||
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
||||||
|
assert.Equal(t, client, dynamicClient)
|
||||||
|
assert.Equal(t, fields, fields)
|
||||||
|
assert.Equal(t, expectedGVK, gvk)
|
||||||
|
assert.Equal(t, db, dbClient)
|
||||||
|
assert.Equal(t, false, shouldEncrypt)
|
||||||
|
assert.Equal(t, 0, gcKeepCount)
|
||||||
|
assert.Nil(t, externalUpdateInfo)
|
||||||
|
return expectedI, nil
|
||||||
|
}
|
||||||
|
f := &CacheFactory{
|
||||||
|
dbClient: dbClient,
|
||||||
|
newInformer: testNewInformer,
|
||||||
|
informers: map[schema.GroupVersionKind]*guardedInformer{},
|
||||||
|
}
|
||||||
|
f.ctx, f.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
_, err := f.CacheFor(ctx, fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||||
|
errCh <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
assert.Fail(t, "CacheFor never exited")
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
}})
|
||||||
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and ctx is canceled, should not call Run() more than once and not return an error", test: func(t *testing.T) {
|
tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and ctx is canceled, should not call Run() more than once and not return an error", test: func(t *testing.T) {
|
||||||
dbClient := NewMockClient(gomock.NewController(t))
|
dbClient := NewMockClient(gomock.NewController(t))
|
||||||
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
|
dynamicClient := NewMockResourceInterface(gomock.NewController(t))
|
||||||
@@ -176,6 +227,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expectedC := &Cache{
|
expectedC := &Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
|
gvk: expectedGVK,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
@@ -193,7 +245,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
informers: map[schema.GroupVersionKind]*guardedInformer{},
|
informers: map[schema.GroupVersionKind]*guardedInformer{},
|
||||||
}
|
}
|
||||||
f.ctx, f.cancel = context.WithCancel(context.Background())
|
f.ctx, f.cancel = context.WithCancel(context.Background())
|
||||||
f.Stop()
|
f.Stop(expectedGVK)
|
||||||
|
|
||||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
@@ -219,6 +271,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expectedC := &Cache{
|
expectedC := &Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
|
gvk: expectedGVK,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
@@ -240,7 +293,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
f.Stop()
|
f.Stop(expectedGVK)
|
||||||
}()
|
}()
|
||||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
@@ -271,6 +324,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expectedC := &Cache{
|
expectedC := &Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
|
gvk: expectedGVK,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
@@ -292,7 +346,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
f.Stop()
|
f.Stop(expectedGVK)
|
||||||
}()
|
}()
|
||||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
@@ -322,6 +376,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expectedC := &Cache{
|
expectedC := &Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
|
gvk: expectedGVK,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
@@ -343,7 +398,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
f.Stop()
|
f.Stop(expectedGVK)
|
||||||
}()
|
}()
|
||||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
@@ -373,6 +428,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expectedC := &Cache{
|
expectedC := &Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
|
gvk: expectedGVK,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
||||||
// we can't test func == func, so instead we check if the output was as expected
|
// we can't test func == func, so instead we check if the output was as expected
|
||||||
@@ -402,7 +458,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
// this function ensures that ctx is not canceled for the duration of this test but if part of a longer process it will be closed eventually
|
// this function ensures that ctx is not canceled for the duration of this test but if part of a longer process it will be closed eventually
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
f.Stop()
|
f.Stop(expectedGVK)
|
||||||
}()
|
}()
|
||||||
var c *Cache
|
var c *Cache
|
||||||
var err error
|
var err error
|
||||||
@@ -430,6 +486,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
}
|
}
|
||||||
expectedC := &Cache{
|
expectedC := &Cache{
|
||||||
ByOptionsLister: i,
|
ByOptionsLister: i,
|
||||||
|
gvk: expectedGVK,
|
||||||
}
|
}
|
||||||
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) {
|
||||||
assert.Equal(t, client, dynamicClient)
|
assert.Equal(t, client, dynamicClient)
|
||||||
@@ -454,7 +511,7 @@ func TestCacheFor(t *testing.T) {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
f.Stop()
|
f.Stop(expectedGVK)
|
||||||
}()
|
}()
|
||||||
// CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool)
|
// CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool)
|
||||||
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true)
|
||||||
|
@@ -97,7 +97,7 @@ func (i *IntegrationSuite) TestSQLCacheFilters() {
|
|||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
defer func() {
|
defer func() {
|
||||||
cacheFactory.DoneWithCache(cache)
|
cacheFactory.DoneWithCache(cache)
|
||||||
cacheFactory.Stop()
|
cacheFactory.Stop(cache.GVK())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// doesn't match the filter for somekey == somevalue
|
// doesn't match the filter for somekey == somevalue
|
||||||
|
78
pkg/sqlcache/schematracker/schema.go
Normal file
78
pkg/sqlcache/schematracker/schema.go
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
package schematracker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"slices"
|
||||||
|
|
||||||
|
"github.com/rancher/steve/pkg/attributes"
|
||||||
|
"github.com/rancher/steve/pkg/resources/common"
|
||||||
|
"github.com/rancher/steve/pkg/schema"
|
||||||
|
k8sschema "k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Resetter interface {
|
||||||
|
Reset(k8sschema.GroupVersionKind) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type SchemaTracker struct {
|
||||||
|
knownSchemas map[k8sschema.GroupVersionKind][]common.ColumnDefinition
|
||||||
|
resetter Resetter
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSchemaTracker(resetter Resetter) *SchemaTracker {
|
||||||
|
return &SchemaTracker{
|
||||||
|
knownSchemas: make(map[k8sschema.GroupVersionKind][]common.ColumnDefinition),
|
||||||
|
resetter: resetter,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SchemaTracker) OnSchemas(schemas *schema.Collection) error {
|
||||||
|
knownSchemas := make(map[k8sschema.GroupVersionKind][]common.ColumnDefinition)
|
||||||
|
|
||||||
|
needsReset := make(map[k8sschema.GroupVersionKind]struct{})
|
||||||
|
|
||||||
|
deletedSchemas := make(map[k8sschema.GroupVersionKind]struct{})
|
||||||
|
for gvk := range s.knownSchemas {
|
||||||
|
deletedSchemas[gvk] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, id := range schemas.IDs() {
|
||||||
|
theSchema := schemas.Schema(id)
|
||||||
|
if theSchema == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
gvk := attributes.GVK(theSchema)
|
||||||
|
|
||||||
|
cols := common.GetColumnDefinitions(theSchema)
|
||||||
|
|
||||||
|
knownSchemas[gvk] = cols
|
||||||
|
|
||||||
|
oldCols, exists := s.knownSchemas[gvk]
|
||||||
|
if exists {
|
||||||
|
if !slices.Equal(cols, oldCols) {
|
||||||
|
needsReset[gvk] = struct{}{}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
needsReset[gvk] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Schema is still there so it hasn't been deleted
|
||||||
|
delete(deletedSchemas, gvk)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All deleted schemas must be resetted as well
|
||||||
|
for gvk := range deletedSchemas {
|
||||||
|
needsReset[gvk] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset known schemas
|
||||||
|
var retErr error
|
||||||
|
for gvk := range needsReset {
|
||||||
|
err := s.resetter.Reset(gvk)
|
||||||
|
retErr = errors.Join(retErr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.knownSchemas = knownSchemas
|
||||||
|
return retErr
|
||||||
|
}
|
231
pkg/sqlcache/schematracker/schema_test.go
Normal file
231
pkg/sqlcache/schematracker/schema_test.go
Normal file
@@ -0,0 +1,231 @@
|
|||||||
|
package schematracker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/rancher/apiserver/pkg/types"
|
||||||
|
"github.com/rancher/steve/pkg/attributes"
|
||||||
|
"github.com/rancher/steve/pkg/resources/common"
|
||||||
|
"github.com/rancher/steve/pkg/schema"
|
||||||
|
"github.com/rancher/wrangler/v3/pkg/schemas"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
k8sschema "k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testResetter struct {
|
||||||
|
Resets map[k8sschema.GroupVersionKind]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testResetter) Reset(gvk k8sschema.GroupVersionKind) error {
|
||||||
|
if r.Resets == nil {
|
||||||
|
r.Resets = make(map[k8sschema.GroupVersionKind]struct{})
|
||||||
|
}
|
||||||
|
r.Resets[gvk] = struct{}{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchemaTracker(t *testing.T) {
|
||||||
|
pods := &types.APISchema{
|
||||||
|
Schema: &schemas.Schema{ID: "pods"},
|
||||||
|
}
|
||||||
|
attributes.SetGVK(pods, k8sschema.GroupVersionKind{
|
||||||
|
Version: "v1",
|
||||||
|
Kind: "Pod",
|
||||||
|
})
|
||||||
|
attributes.SetGVR(pods, k8sschema.GroupVersionResource{
|
||||||
|
Version: "v1",
|
||||||
|
Resource: "pods",
|
||||||
|
})
|
||||||
|
|
||||||
|
configmaps := &types.APISchema{
|
||||||
|
Schema: &schemas.Schema{ID: "configmaps"},
|
||||||
|
}
|
||||||
|
attributes.SetGVK(configmaps, k8sschema.GroupVersionKind{
|
||||||
|
Version: "v1",
|
||||||
|
Kind: "ConfigMap",
|
||||||
|
})
|
||||||
|
attributes.SetGVR(configmaps, k8sschema.GroupVersionResource{
|
||||||
|
Version: "v1",
|
||||||
|
Resource: "configmaps",
|
||||||
|
})
|
||||||
|
|
||||||
|
foo := &types.APISchema{
|
||||||
|
Schema: &schemas.Schema{ID: "test.io.foos"},
|
||||||
|
}
|
||||||
|
attributes.SetGVK(foo, k8sschema.GroupVersionKind{
|
||||||
|
Group: "test.io",
|
||||||
|
Version: "v1",
|
||||||
|
Kind: "Foo",
|
||||||
|
})
|
||||||
|
attributes.SetGVR(foo, k8sschema.GroupVersionResource{
|
||||||
|
Group: "test.io",
|
||||||
|
Version: "v1",
|
||||||
|
Resource: "foos",
|
||||||
|
})
|
||||||
|
|
||||||
|
foos1 := &types.APISchema{
|
||||||
|
Schema: &schemas.Schema{ID: "test.io.foos"},
|
||||||
|
}
|
||||||
|
attributes.SetGVK(foos1, k8sschema.GroupVersionKind{
|
||||||
|
Group: "test.io",
|
||||||
|
Version: "v1",
|
||||||
|
Kind: "Foo",
|
||||||
|
})
|
||||||
|
attributes.SetGVR(foos1, k8sschema.GroupVersionResource{
|
||||||
|
Group: "test.io",
|
||||||
|
Version: "v1",
|
||||||
|
Resource: "foos",
|
||||||
|
})
|
||||||
|
attributes.SetColumns(foos1, []common.ColumnDefinition{
|
||||||
|
{Field: "field1"}, {Field: "field2"},
|
||||||
|
})
|
||||||
|
|
||||||
|
foos2 := &types.APISchema{
|
||||||
|
Schema: &schemas.Schema{ID: "test.io.foos"},
|
||||||
|
}
|
||||||
|
attributes.SetGVK(foos2, k8sschema.GroupVersionKind{
|
||||||
|
Group: "test.io",
|
||||||
|
Version: "v1",
|
||||||
|
Kind: "Foo",
|
||||||
|
})
|
||||||
|
attributes.SetGVR(foos2, k8sschema.GroupVersionResource{
|
||||||
|
Group: "test.io",
|
||||||
|
Version: "v1",
|
||||||
|
Resource: "foos",
|
||||||
|
})
|
||||||
|
attributes.SetColumns(foos2, []common.ColumnDefinition{
|
||||||
|
{Field: "field1"}, {Field: "field2"}, {Field: "field3"},
|
||||||
|
})
|
||||||
|
|
||||||
|
bars := &types.APISchema{
|
||||||
|
Schema: &schemas.Schema{ID: "test.io.bars"},
|
||||||
|
}
|
||||||
|
attributes.SetGVK(bars, k8sschema.GroupVersionKind{
|
||||||
|
Group: "test.io",
|
||||||
|
Version: "v1",
|
||||||
|
Kind: "Bar",
|
||||||
|
})
|
||||||
|
attributes.SetGVR(bars, k8sschema.GroupVersionResource{
|
||||||
|
Group: "test.io",
|
||||||
|
Version: "v1",
|
||||||
|
Resource: "bars",
|
||||||
|
})
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
initialSchemas map[string]*types.APISchema
|
||||||
|
refreshedSchemas map[string]*types.APISchema
|
||||||
|
expectedResets map[k8sschema.GroupVersionKind]struct{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no change",
|
||||||
|
initialSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
},
|
||||||
|
refreshedSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single schema added",
|
||||||
|
initialSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
},
|
||||||
|
refreshedSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
"pods": pods,
|
||||||
|
},
|
||||||
|
expectedResets: map[k8sschema.GroupVersionKind]struct{}{
|
||||||
|
attributes.GVK(pods): {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple schemas added",
|
||||||
|
initialSchemas: map[string]*types.APISchema{},
|
||||||
|
refreshedSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
"pods": pods,
|
||||||
|
},
|
||||||
|
expectedResets: map[k8sschema.GroupVersionKind]struct{}{
|
||||||
|
attributes.GVK(configmaps): {},
|
||||||
|
attributes.GVK(pods): {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single schema removed",
|
||||||
|
initialSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
"pods": pods,
|
||||||
|
},
|
||||||
|
refreshedSchemas: map[string]*types.APISchema{
|
||||||
|
"pods": pods,
|
||||||
|
},
|
||||||
|
expectedResets: map[k8sschema.GroupVersionKind]struct{}{
|
||||||
|
attributes.GVK(configmaps): {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple schemas removed",
|
||||||
|
initialSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
"pods": pods,
|
||||||
|
},
|
||||||
|
refreshedSchemas: map[string]*types.APISchema{},
|
||||||
|
expectedResets: map[k8sschema.GroupVersionKind]struct{}{
|
||||||
|
attributes.GVK(configmaps): {},
|
||||||
|
attributes.GVK(pods): {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "field changed",
|
||||||
|
initialSchemas: map[string]*types.APISchema{
|
||||||
|
"test.io.foos": foos1,
|
||||||
|
},
|
||||||
|
refreshedSchemas: map[string]*types.APISchema{
|
||||||
|
"test.io.foos": foos2,
|
||||||
|
},
|
||||||
|
expectedResets: map[k8sschema.GroupVersionKind]struct{}{
|
||||||
|
attributes.GVK(foos2): {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "added deleted and changed",
|
||||||
|
initialSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
"pods": pods,
|
||||||
|
"test.io.foos": foos1,
|
||||||
|
},
|
||||||
|
refreshedSchemas: map[string]*types.APISchema{
|
||||||
|
"configmaps": configmaps,
|
||||||
|
"test.io.bars": bars,
|
||||||
|
"test.io.foos": foos2,
|
||||||
|
},
|
||||||
|
expectedResets: map[k8sschema.GroupVersionKind]struct{}{
|
||||||
|
attributes.GVK(foos2): {},
|
||||||
|
attributes.GVK(pods): {},
|
||||||
|
attributes.GVK(bars): {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
resetter := &testResetter{}
|
||||||
|
tracker := NewSchemaTracker(resetter)
|
||||||
|
collection := schema.NewCollection(context.TODO(), types.EmptyAPISchemas(), nil)
|
||||||
|
|
||||||
|
collection.Reset(test.initialSchemas)
|
||||||
|
err := tracker.OnSchemas(collection)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Reset because we don't care about the initial list of resets
|
||||||
|
resetter.Resets = nil
|
||||||
|
|
||||||
|
collection.Reset(test.refreshedSchemas)
|
||||||
|
err = tracker.OnSchemas(collection)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, test.expectedResets, resetter.Resets)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@@ -294,17 +294,17 @@ func (mr *MockCacheFactoryMockRecorder) DoneWithCache(arg0 any) *gomock.Call {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stop mocks base method.
|
// Stop mocks base method.
|
||||||
func (m *MockCacheFactory) Stop() error {
|
func (m *MockCacheFactory) Stop(gvk schema.GroupVersionKind) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "Stop")
|
ret := m.ctrl.Call(m, "Stop", gvk)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop indicates an expected call of Stop.
|
// Stop indicates an expected call of Stop.
|
||||||
func (mr *MockCacheFactoryMockRecorder) Stop() *gomock.Call {
|
func (mr *MockCacheFactoryMockRecorder) Stop(gvk any) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockCacheFactory)(nil).Stop))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockCacheFactory)(nil).Stop), gvk)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MockSchemaColumnSetter is a mock of SchemaColumnSetter interface.
|
// MockSchemaColumnSetter is a mock of SchemaColumnSetter interface.
|
||||||
|
@@ -302,7 +302,7 @@ type CacheFactoryInitializer func() (CacheFactory, error)
|
|||||||
type CacheFactory interface {
|
type CacheFactory interface {
|
||||||
CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*factory.Cache, error)
|
CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*factory.Cache, error)
|
||||||
DoneWithCache(*factory.Cache)
|
DoneWithCache(*factory.Cache)
|
||||||
Stop() error
|
Stop(gvk schema.GroupVersionKind) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewProxyStore returns a Store implemented directly on top of kubernetes.
|
// NewProxyStore returns a Store implemented directly on top of kubernetes.
|
||||||
@@ -334,19 +334,21 @@ func NewProxyStore(ctx context.Context, c SchemaColumnSetter, clientGetter Clien
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Reset locks the store, resets the underlying cache factory, and warm the namespace cache.
|
// Reset locks the store, resets the underlying cache factory, and warm the namespace cache.
|
||||||
func (s *Store) Reset() error {
|
func (s *Store) Reset(gvk schema.GroupVersionKind) error {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
if s.namespaceCache != nil {
|
if s.namespaceCache != nil && gvk == namespaceGVK {
|
||||||
s.cacheFactory.DoneWithCache(s.namespaceCache)
|
s.cacheFactory.DoneWithCache(s.namespaceCache)
|
||||||
}
|
}
|
||||||
if err := s.cacheFactory.Stop(); err != nil {
|
if err := s.cacheFactory.Stop(gvk); err != nil {
|
||||||
return fmt.Errorf("reset: %w", err)
|
return fmt.Errorf("reset: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if gvk == namespaceGVK {
|
||||||
if err := s.initializeNamespaceCache(); err != nil {
|
if err := s.initializeNamespaceCache(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -33,6 +33,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
schema2 "k8s.io/apimachinery/pkg/runtime/schema"
|
schema2 "k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/authentication/user"
|
"k8s.io/apiserver/pkg/authentication/user"
|
||||||
@@ -726,13 +727,14 @@ func TestReset(t *testing.T) {
|
|||||||
transformBuilder: tb,
|
transformBuilder: tb,
|
||||||
}
|
}
|
||||||
nsSchema := baseNSSchema
|
nsSchema := baseNSSchema
|
||||||
cf.EXPECT().Stop().Return(nil)
|
gvk := attributes.GVK(&nsSchema)
|
||||||
|
cf.EXPECT().Stop(gvk).Return(nil)
|
||||||
cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil)
|
cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil)
|
||||||
cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil)
|
cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil)
|
||||||
cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nsc, nil)
|
cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nsc, nil)
|
||||||
cf.EXPECT().DoneWithCache(nsc)
|
cf.EXPECT().DoneWithCache(nsc)
|
||||||
tb.EXPECT().GetTransformFunc(attributes.GVK(&nsSchema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil })
|
tb.EXPECT().GetTransformFunc(gvk, gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil })
|
||||||
err := s.Reset()
|
err := s.Reset(gvk)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.Equal(t, nsc, s.namespaceCache)
|
assert.Equal(t, nsc, s.namespaceCache)
|
||||||
},
|
},
|
||||||
@@ -754,8 +756,9 @@ func TestReset(t *testing.T) {
|
|||||||
transformBuilder: tb,
|
transformBuilder: tb,
|
||||||
}
|
}
|
||||||
|
|
||||||
cf.EXPECT().Stop().Return(fmt.Errorf("error"))
|
gvk := schema.GroupVersionKind{}
|
||||||
err := s.Reset()
|
cf.EXPECT().Stop(gvk).Return(fmt.Errorf("error"))
|
||||||
|
err := s.Reset(gvk)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -776,9 +779,12 @@ func TestReset(t *testing.T) {
|
|||||||
transformBuilder: tb,
|
transformBuilder: tb,
|
||||||
}
|
}
|
||||||
|
|
||||||
cf.EXPECT().Stop().Return(nil)
|
nsSchema := baseNSSchema
|
||||||
|
gvk := attributes.GVK(&nsSchema)
|
||||||
|
|
||||||
|
cf.EXPECT().Stop(gvk).Return(nil)
|
||||||
cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(fmt.Errorf("error"))
|
cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(fmt.Errorf("error"))
|
||||||
err := s.Reset()
|
err := s.Reset(gvk)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -799,11 +805,12 @@ func TestReset(t *testing.T) {
|
|||||||
transformBuilder: tb,
|
transformBuilder: tb,
|
||||||
}
|
}
|
||||||
nsSchema := baseNSSchema
|
nsSchema := baseNSSchema
|
||||||
|
gvk := attributes.GVK(&nsSchema)
|
||||||
|
|
||||||
cf.EXPECT().Stop().Return(nil)
|
cf.EXPECT().Stop(gvk).Return(nil)
|
||||||
cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil)
|
cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil)
|
||||||
cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(nil, fmt.Errorf("error"))
|
cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(nil, fmt.Errorf("error"))
|
||||||
err := s.Reset()
|
err := s.Reset(gvk)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -825,13 +832,14 @@ func TestReset(t *testing.T) {
|
|||||||
transformBuilder: tb,
|
transformBuilder: tb,
|
||||||
}
|
}
|
||||||
nsSchema := baseNSSchema
|
nsSchema := baseNSSchema
|
||||||
|
gvk := attributes.GVK(&nsSchema)
|
||||||
|
|
||||||
cf.EXPECT().Stop().Return(nil)
|
cf.EXPECT().Stop(gvk).Return(nil)
|
||||||
cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil)
|
cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil)
|
||||||
cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil)
|
cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil)
|
||||||
cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nil, fmt.Errorf("error"))
|
cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nil, fmt.Errorf("error"))
|
||||||
tb.EXPECT().GetTransformFunc(attributes.GVK(&nsSchema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil })
|
tb.EXPECT().GetTransformFunc(gvk, gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil })
|
||||||
err := s.Reset()
|
err := s.Reset(gvk)
|
||||||
assert.NotNil(t, err)
|
assert.NotNil(t, err)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
Reference in New Issue
Block a user