From 13d5ad3ccbe5f4b7d3965e5c8c804805eb56161a Mon Sep 17 00:00:00 2001 From: Tom Lebreux Date: Wed, 10 Sep 2025 17:04:25 -0400 Subject: [PATCH] Stop single caches instead of all of them (#812) * Revert OnSchemas change work * Track schema changes * Only stop a single GVK informer factory * Add tests * Rename crd to crdClient * Rename s to sqlStore * Don't wait for synced caches if request is canceled * Move schematracker to pkg/sqlcache/schematracker --- pkg/controllers/schema/schemas.go | 127 ++-------- pkg/server/server.go | 68 ++---- .../informer/factory/informer_factory.go | 152 +++++++----- .../informer/factory/informer_factory_test.go | 77 +++++- pkg/sqlcache/integration_test.go | 2 +- pkg/sqlcache/schematracker/schema.go | 78 ++++++ pkg/sqlcache/schematracker/schema_test.go | 231 ++++++++++++++++++ pkg/stores/sqlproxy/proxy_mocks_test.go | 8 +- pkg/stores/sqlproxy/proxy_store.go | 14 +- pkg/stores/sqlproxy/proxy_store_test.go | 32 ++- 10 files changed, 543 insertions(+), 246 deletions(-) create mode 100644 pkg/sqlcache/schematracker/schema.go create mode 100644 pkg/sqlcache/schematracker/schema_test.go diff --git a/pkg/controllers/schema/schemas.go b/pkg/controllers/schema/schemas.go index 074be392..db663205 100644 --- a/pkg/controllers/schema/schemas.go +++ b/pkg/controllers/schema/schemas.go @@ -19,7 +19,6 @@ import ( authorizationv1 "k8s.io/api/authorization/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8sapimachineryschema "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1" apiv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -32,31 +31,23 @@ var ( } ) -type SchemasHandlerFunc func(schemas *schema2.Collection, changedSchemas map[string]*types.APISchema, deletedSomething bool) error +type SchemasHandlerFunc func(schemas *schema2.Collection) error -func (s SchemasHandlerFunc) OnSchemas(schemas *schema2.Collection, changedSchemas map[string]*types.APISchema, forceChange bool) error { - return s(schemas, changedSchemas, forceChange) +func (s SchemasHandlerFunc) OnSchemas(schemas *schema2.Collection) error { + return s(schemas) } type handler struct { sync.Mutex - // refreshLock prevents refreshAll to be run in parallel - refreshLock sync.Mutex - - ctx context.Context - toSync int32 - schemas *schema2.Collection - client discovery.DiscoveryInterface - cols *common.DynamicColumns - crdClient apiextcontrollerv1.CustomResourceDefinitionClient - ssar authorizationv1client.SelfSubjectAccessReviewInterface - handler SchemasHandlerFunc - changedIDs map[k8sapimachineryschema.GroupVersionKind]bool - createdCRDs map[k8sapimachineryschema.GroupVersionKind]bool - deletedCRDs map[k8sapimachineryschema.GroupVersionKind]bool - apiServiceChanged bool - gvksFromKeys map[string][]k8sapimachineryschema.GroupVersionKind + ctx context.Context + toSync int32 + schemas *schema2.Collection + client discovery.DiscoveryInterface + cols *common.DynamicColumns + crdClient apiextcontrollerv1.CustomResourceDefinitionClient + ssar authorizationv1client.SelfSubjectAccessReviewInterface + handler SchemasHandlerFunc } func Register(ctx context.Context, @@ -69,65 +60,25 @@ func Register(ctx context.Context, schemas *schema2.Collection) { h := &handler{ - ctx: ctx, - cols: cols, - client: discovery, - schemas: schemas, - handler: schemasHandler, - crdClient: crd, - ssar: ssar, - changedIDs: make(map[k8sapimachineryschema.GroupVersionKind]bool), - gvksFromKeys: make(map[string][]k8sapimachineryschema.GroupVersionKind), - createdCRDs: make(map[k8sapimachineryschema.GroupVersionKind]bool), - deletedCRDs: make(map[k8sapimachineryschema.GroupVersionKind]bool), + ctx: ctx, + cols: cols, + client: discovery, + schemas: schemas, + handler: schemasHandler, + crdClient: crd, + ssar: ssar, } apiService.OnChange(ctx, "schema", h.OnChangeAPIService) crd.OnChange(ctx, "schema", h.OnChangeCRD) } -func (h *handler) handleDeletedCRD(key string, crd *apiextv1.CustomResourceDefinition) { - h.Lock() - defer h.Unlock() - gvkList, ok := h.gvksFromKeys[key] - if !ok { - logrus.Infof("No associated GVK for CRD key %s", key) - return - } - for _, gvk := range gvkList { - h.deletedCRDs[gvk] = true - } - delete(h.gvksFromKeys, key) // Don't need this anymore - h.queueRefresh() -} - func (h *handler) OnChangeCRD(key string, crd *apiextv1.CustomResourceDefinition) (*apiextv1.CustomResourceDefinition, error) { - if crd == nil { - h.handleDeletedCRD(key, crd) - return crd, nil - } - spec := crd.Spec - group := spec.Group - kind := spec.Names.Kind - gvkList := make([]k8sapimachineryschema.GroupVersionKind, len(spec.Versions)) - h.Lock() - defer h.Unlock() - for i, version := range spec.Versions { - gvk := k8sapimachineryschema.GroupVersionKind{Group: group, Version: version.Name, Kind: kind} - gvkList[i] = gvk - h.changedIDs[gvk] = true - _, ok := h.gvksFromKeys[key] - if !ok { - h.createdCRDs[gvk] = true - } - } - h.gvksFromKeys[key] = gvkList h.queueRefresh() return crd, nil } func (h *handler) OnChangeAPIService(key string, api *apiv1.APIService) (*apiv1.APIService, error) { - h.apiServiceChanged = true h.queueRefresh() return api, nil } @@ -137,34 +88,7 @@ func (h *handler) queueRefresh() { go func() { time.Sleep(500 * time.Millisecond) - var err error - var changedIDs map[k8sapimachineryschema.GroupVersionKind]bool - var deletedCRDs map[k8sapimachineryschema.GroupVersionKind]bool - var createdCRDs map[k8sapimachineryschema.GroupVersionKind]bool - var apiServiceChanged bool - h.Lock() - if len(h.createdCRDs) > 0 { - createdCRDs = h.createdCRDs - h.createdCRDs = make(map[k8sapimachineryschema.GroupVersionKind]bool) - } - if len(h.deletedCRDs) > 0 { - deletedCRDs = h.deletedCRDs - h.deletedCRDs = make(map[k8sapimachineryschema.GroupVersionKind]bool) - } - if len(h.changedIDs) > 0 { - changedIDs = h.changedIDs - h.changedIDs = make(map[k8sapimachineryschema.GroupVersionKind]bool) - } - if h.apiServiceChanged { - apiServiceChanged = true - h.apiServiceChanged = false - } - h.Unlock() - crdNumCountChanged := len(deletedCRDs) > 0 || len(createdCRDs) > 0 - if len(changedIDs) > 0 || apiServiceChanged || crdNumCountChanged { - err = h.refreshAll(h.ctx, changedIDs, crdNumCountChanged) - } - if err != nil { + if err := h.refreshAll(h.ctx); err != nil { logrus.Errorf("failed to sync schemas: %v", err) atomic.StoreInt32(&h.toSync, 1) } @@ -224,9 +148,9 @@ func (h *handler) getColumns(ctx context.Context, schemas map[string]*types.APIS return eg.Wait() } -func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachineryschema.GroupVersionKind]bool, forceChange bool) error { - h.refreshLock.Lock() - defer h.refreshLock.Unlock() +func (h *handler) refreshAll(ctx context.Context) error { + h.Lock() + defer h.Unlock() if !h.needToSync() { return nil @@ -238,7 +162,6 @@ func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachiner } filteredSchemas := map[string]*types.APISchema{} - changedSchemasByID := map[string]*types.APISchema{} for _, schema := range schemas { if IsListWatchable(schema) { if preferredTypeExists(schema, schemas) { @@ -258,10 +181,6 @@ func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachiner schema.PluralName = converter.GVRToPluralName(gvr) } filteredSchemas[schema.ID] = schema - if changedGVKs[gvk] { - // nil[x] is always false if the first-time runner called this - changedSchemasByID[schema.ID] = schema - } } if err := h.getColumns(h.ctx, filteredSchemas); err != nil { @@ -270,7 +189,7 @@ func (h *handler) refreshAll(ctx context.Context, changedGVKs map[k8sapimachiner h.schemas.Reset(filteredSchemas) if h.handler != nil { - return h.handler.OnSchemas(h.schemas, changedSchemasByID, forceChange) + return h.handler.OnSchemas(h.schemas) } return nil diff --git a/pkg/server/server.go b/pkg/server/server.go index cdf5e55d..7a1570ec 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "net/http" - "slices" - "sync" apiserver "github.com/rancher/apiserver/pkg/server" "github.com/rancher/apiserver/pkg/types" @@ -26,12 +24,12 @@ import ( "github.com/rancher/steve/pkg/server/handler" "github.com/rancher/steve/pkg/server/router" "github.com/rancher/steve/pkg/sqlcache/informer/factory" + "github.com/rancher/steve/pkg/sqlcache/schematracker" metricsStore "github.com/rancher/steve/pkg/stores/metrics" "github.com/rancher/steve/pkg/stores/proxy" "github.com/rancher/steve/pkg/stores/sqlpartition" "github.com/rancher/steve/pkg/stores/sqlproxy" "github.com/rancher/steve/pkg/summarycache" - "github.com/sirupsen/logrus" "k8s.io/client-go/rest" ) @@ -235,63 +233,25 @@ func setup(ctx context.Context, server *Server) error { for _, template := range resources.DefaultSchemaTemplatesForStore(store, server.BaseSchemas, summaryCache, asl, server.controllers.K8s.Discovery(), common.TemplateOptions{InSQLMode: true}) { sf.AddTemplate(template) } - mutex := &sync.Mutex{} - fieldsForSchema := make(map[string][][]string) // map schemaID to fields - initializedDB := false - onSchemasHandler = func(schemas *schema.Collection, changedSchemas map[string]*types.APISchema, deletedSomething bool) error { - resetEverything := false - // We need a mutex around the fieldsForSchema closure because this handler is invoked asynchronously - // from the server - mutex.Lock() - if !initializedDB { - initializedDB = true - resetEverything = true - for _, id := range schemas.IDs() { - theSchema := schemas.Schema(id) - if theSchema == nil { - fieldsForSchema[id] = [][]string{} - continue - } - fieldsForSchema[id] = sqlproxy.GetFieldsFromSchema(theSchema) - } - logrus.Debugf("onSchemasHandler: need to reset everything on first run") - } else { - for id, theSchema := range changedSchemas { - oldFields, ok := fieldsForSchema[id] - newFields := sqlproxy.GetFieldsFromSchema(theSchema) - if !ok || !slices.EqualFunc(oldFields, newFields, - func(s1, s2 []string) bool { - return slices.Equal(s1, s2) - }) { - resetEverything = true - } - fieldsForSchema[id] = newFields - } - if deletedSomething { - resetEverything = true - } - logrus.Debugf("onSchemasHandler: need to reset everything: %t", resetEverything) - } - mutex.Unlock() - if !resetEverything { - return nil - } - if err := ccache.OnSchemas(schemas); err != nil { - return err - } - if err := sqlStore.Reset(); err != nil { - return err - } - return nil + sqlSchemaTracker := schematracker.NewSchemaTracker(sqlStore) + + onSchemasHandler = func(schemas *schema.Collection) error { + var retErr error + + err := ccache.OnSchemas(schemas) + retErr = errors.Join(retErr, err) + + err = sqlSchemaTracker.OnSchemas(schemas) + retErr = errors.Join(retErr, err) + + return retErr } } else { for _, template := range resources.DefaultSchemaTemplates(cf, server.BaseSchemas, summaryCache, asl, server.controllers.K8s.Discovery(), server.controllers.Core.Namespace().Cache(), common.TemplateOptions{InSQLMode: false}) { sf.AddTemplate(template) } - onSchemasHandler = func(schemas *schema.Collection, _ map[string]*types.APISchema, _ bool) error { - return ccache.OnSchemas(schemas) - } + onSchemasHandler = ccache.OnSchemas } schemas.SetupWatcher(ctx, server.BaseSchemas, asl, sf) diff --git a/pkg/sqlcache/informer/factory/informer_factory.go b/pkg/sqlcache/informer/factory/informer_factory.go index 644a2216..72d0bd77 100644 --- a/pkg/sqlcache/informer/factory/informer_factory.go +++ b/pkg/sqlcache/informer/factory/informer_factory.go @@ -28,14 +28,12 @@ const EncryptAllEnvVar = "CATTLE_ENCRYPT_CACHE_ALL" // CacheFactory builds Informer instances and keeps a cache of instances it created type CacheFactory struct { - wg wait.Group dbClient db.Client // ctx determines when informers need to stop ctx context.Context cancel context.CancelFunc - mutex sync.RWMutex encryptAll bool gcInterval time.Duration @@ -49,13 +47,31 @@ type CacheFactory struct { type guardedInformer struct { informer *informer.Informer - mutex *sync.Mutex + // informerMutex ensures informer is only set by one goroutine even if + // multiple concurrent calls to CacheFor are made + informerMutex *sync.Mutex + + // stopMutex ensures no CacheFor call can be made for a given GVK when + // a Stop call is ongoing. + // + // CacheFactory.informersMutex is not enough because part of the code + // might still have an old cache from a previous CacheFor call. + stopMutex *sync.RWMutex + + ctx context.Context + cancel context.CancelFunc + wg wait.Group } type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) type Cache struct { informer.ByOptionsLister + gvk schema.GroupVersionKind +} + +func (c *Cache) GVK() schema.GroupVersionKind { + return c.gvk } var defaultEncryptedResourceTypes = map[schema.GroupVersionKind]struct{}{ @@ -90,8 +106,6 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) { } ctx, cancel := context.WithCancel(context.Background()) return &CacheFactory{ - wg: wait.Group{}, - ctx: ctx, cancel: cancel, @@ -111,17 +125,6 @@ func NewCacheFactory(opts CacheFactoryOptions) (*CacheFactory, error) { // // Don't forget to call DoneWithCache with the given informer once done with it. func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) { - // First of all block Reset() until we are done - f.mutex.RLock() - cache, err := f.cacheForLocked(ctx, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, namespaced, watchable) - if err != nil { - f.mutex.RUnlock() - return nil, err - } - return cache, nil -} - -func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) { // Second, check if the informer and its accompanying informer-specific mutex exist already in the informers cache // If not, start by creating such informer-specific mutex. That is used later to ensure no two goroutines create // informers for the same GVK at the same type @@ -130,17 +133,32 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, ex // that blocks CacheFor for other GVKs, hence not deferring unlock here gi, ok := f.informers[gvk] if !ok { + giCtx, giCancel := context.WithCancel(f.ctx) gi = &guardedInformer{ - informer: nil, - mutex: &sync.Mutex{}, + informer: nil, + informerMutex: &sync.Mutex{}, + stopMutex: &sync.RWMutex{}, + ctx: giCtx, + cancel: giCancel, } f.informers[gvk] = gi } f.informersMutex.Unlock() + // Prevent Stop() to be called for that GVK + gi.stopMutex.RLock() + + gvkCache, err := f.cacheForLocked(ctx, gi, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, namespaced, watchable) + if err != nil { + gi.stopMutex.RUnlock() + return nil, err + } + return gvkCache, nil +} + +func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*Cache, error) { // At this point an informer-specific mutex (gi.mutex) is guaranteed to exist. Lock it - gi.mutex.Lock() - defer gi.mutex.Unlock() + gi.informerMutex.Lock() // Then: if the informer really was not created yet (first time here or previous times have errored out) // actually create the informer @@ -155,8 +173,9 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, ex shouldEncrypt := f.encryptAll || encryptResourceAlways // In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer() // search for "func NewInformer(ctx" - i, err := f.newInformer(f.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, f.gcInterval, f.gcKeepCount) + i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, namespaced, watchable, f.gcInterval, f.gcKeepCount) if err != nil { + gi.informerMutex.Unlock() return nil, err } @@ -168,69 +187,92 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, fields [][]string, ex cache.DefaultWatchErrorHandler(ctx, r, err) }) if err != nil { + gi.informerMutex.Unlock() return nil, err } - f.wg.StartWithChannel(f.ctx.Done(), i.Run) + gi.wg.StartWithChannel(gi.ctx.Done(), i.Run) gi.informer = i } + gi.informerMutex.Unlock() - if !cache.WaitForCacheSync(f.ctx.Done(), gi.informer.HasSynced) { + // We don't want to get stuck in WaitForCachesSync if the request from + // the client has been canceled. + waitCh := make(chan struct{}, 1) + go func() { + select { + case <-ctx.Done(): + close(waitCh) + case <-gi.ctx.Done(): + close(waitCh) + } + }() + + if !cache.WaitForCacheSync(waitCh, gi.informer.HasSynced) { return nil, fmt.Errorf("failed to sync SQLite Informer cache for GVK %v", gvk) } // At this point the informer is ready, return it - return &Cache{ByOptionsLister: gi.informer}, nil + return &Cache{ByOptionsLister: gi.informer, gvk: gvk}, nil } -// DoneWithCache must be called for every CacheFor call. +// DoneWithCache must be called for every successful CacheFor call. The Cache should +// no longer be used after DoneWithCache is called. // // This ensures that there aren't any inflight list requests while we are resetting the database. -// -// TODO: Use the *Cache once we go per-GVK -func (f *CacheFactory) DoneWithCache(_ *Cache) { - f.mutex.RUnlock() +func (f *CacheFactory) DoneWithCache(cache *Cache) { + if cache == nil { + return + } + + f.informersMutex.Lock() + defer f.informersMutex.Unlock() + + // Note: the informers cache is protected by informersMutex, which we don't want to hold for very long because + // that blocks CacheFor for other GVKs, hence not deferring unlock here + gi, ok := f.informers[cache.gvk] + if !ok { + return + } + + gi.stopMutex.RUnlock() } // Stop cancels ctx which stops any running informers, assigns a new ctx, resets the GVK-informer cache, and resets // the database connection which wipes any current sqlite database at the default location. -func (f *CacheFactory) Stop() error { +func (f *CacheFactory) Stop(gvk schema.GroupVersionKind) error { if f.dbClient == nil { // nothing to reset return nil } - // We must stop informers here to unblock those stuck in WaitForCacheSync - // which is blocking DoneWithCache call. - // - // This is fine without a lock as long as multiple Stop() call aren't made - // concurrently (which they currently aren't) - f.cancel() - - // Prevent more CacheFor calls - f.mutex.Lock() - defer f.mutex.Unlock() - - // Wait for all informers to have exited - f.wg.Wait() - - f.ctx, f.cancel = context.WithCancel(context.Background()) - - // and get rid of all references to those informers and their mutexes f.informersMutex.Lock() defer f.informersMutex.Unlock() - for gvk, informer := range f.informers { - // DropAll needs its own context because the context from the CacheFactory - // is canceled - err := informer.informer.DropAll(context.Background()) - if err != nil { - return fmt.Errorf("dropall %q: %w", gvk, err) - } + gi, ok := f.informers[gvk] + if !ok { + return nil } + delete(f.informers, gvk) - f.informers = make(map[schema.GroupVersionKind]*guardedInformer) + // We must stop informers here to unblock those stuck in WaitForCacheSync + // which is blocking DoneWithCache call. + gi.cancel() + + // Prevent other CacheFor calls for that GVK + gi.stopMutex.Lock() + defer gi.stopMutex.Unlock() + + // Wait for all informers to have exited + gi.wg.Wait() + + // DropAll needs its own context because the context from the informer + // is canceled + err := gi.informer.DropAll(context.Background()) + if err != nil { + return fmt.Errorf("dropall %q: %w", gvk, err) + } return nil } diff --git a/pkg/sqlcache/informer/factory/informer_factory_test.go b/pkg/sqlcache/informer/factory/informer_factory_test.go index 5fcd484e..4cf475b7 100644 --- a/pkg/sqlcache/informer/factory/informer_factory_test.go +++ b/pkg/sqlcache/informer/factory/informer_factory_test.go @@ -79,6 +79,7 @@ func TestCacheFor(t *testing.T) { } expectedC := &Cache{ ByOptionsLister: i, + gvk: expectedGVK, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) @@ -100,7 +101,7 @@ func TestCacheFor(t *testing.T) { go func() { // this function ensures that ctx is open for the duration of this test but if part of a longer process it will be closed eventually time.Sleep(5 * time.Second) - f.Stop() + f.Stop(expectedGVK) }() c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -148,13 +149,63 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(1 * time.Second) - f.Stop() + f.Stop(expectedGVK) }() - var err error - _, err = f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) + _, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.NotNil(t, err) time.Sleep(2 * time.Second) }}) + tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning false, request is canceled", test: func(t *testing.T) { + dbClient := NewMockClient(gomock.NewController(t)) + dynamicClient := NewMockResourceInterface(gomock.NewController(t)) + fields := [][]string{{"something"}} + expectedGVK := schema.GroupVersionKind{} + + bloi := NewMockByOptionsLister(gomock.NewController(t)) + bloi.EXPECT().RunGC(gomock.Any()).AnyTimes() + bloi.EXPECT().DropAll(gomock.Any()).AnyTimes() + sii := NewMockSharedIndexInformer(gomock.NewController(t)) + sii.EXPECT().HasSynced().Return(false).AnyTimes() + sii.EXPECT().Run(gomock.Any()) + sii.EXPECT().SetWatchErrorHandler(gomock.Any()) + expectedI := &informer.Informer{ + // need to set this so Run function is not nil + SharedIndexInformer: sii, + ByOptionsLister: bloi, + } + testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { + assert.Equal(t, client, dynamicClient) + assert.Equal(t, fields, fields) + assert.Equal(t, expectedGVK, gvk) + assert.Equal(t, db, dbClient) + assert.Equal(t, false, shouldEncrypt) + assert.Equal(t, 0, gcKeepCount) + assert.Nil(t, externalUpdateInfo) + return expectedI, nil + } + f := &CacheFactory{ + dbClient: dbClient, + newInformer: testNewInformer, + informers: map[schema.GroupVersionKind]*guardedInformer{}, + } + f.ctx, f.cancel = context.WithCancel(context.Background()) + + errCh := make(chan error, 1) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second) + defer cancel() + _, err := f.CacheFor(ctx, fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) + errCh <- err + }() + + select { + case err := <-errCh: + assert.NotNil(t, err) + case <-time.After(2 * time.Second): + assert.Fail(t, "CacheFor never exited") + } + time.Sleep(2 * time.Second) + }}) tests = append(tests, testCase{description: "CacheFor() with no errors returned, HasSync returning true, and ctx is canceled, should not call Run() more than once and not return an error", test: func(t *testing.T) { dbClient := NewMockClient(gomock.NewController(t)) dynamicClient := NewMockResourceInterface(gomock.NewController(t)) @@ -176,6 +227,7 @@ func TestCacheFor(t *testing.T) { } expectedC := &Cache{ ByOptionsLister: i, + gvk: expectedGVK, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) @@ -193,7 +245,7 @@ func TestCacheFor(t *testing.T) { informers: map[schema.GroupVersionKind]*guardedInformer{}, } f.ctx, f.cancel = context.WithCancel(context.Background()) - f.Stop() + f.Stop(expectedGVK) c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -219,6 +271,7 @@ func TestCacheFor(t *testing.T) { } expectedC := &Cache{ ByOptionsLister: i, + gvk: expectedGVK, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) @@ -240,7 +293,7 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(10 * time.Second) - f.Stop() + f.Stop(expectedGVK) }() c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -271,6 +324,7 @@ func TestCacheFor(t *testing.T) { } expectedC := &Cache{ ByOptionsLister: i, + gvk: expectedGVK, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) @@ -292,7 +346,7 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(10 * time.Second) - f.Stop() + f.Stop(expectedGVK) }() c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -322,6 +376,7 @@ func TestCacheFor(t *testing.T) { } expectedC := &Cache{ ByOptionsLister: i, + gvk: expectedGVK, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) @@ -343,7 +398,7 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(10 * time.Second) - f.Stop() + f.Stop(expectedGVK) }() c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) assert.Nil(t, err) @@ -373,6 +428,7 @@ func TestCacheFor(t *testing.T) { } expectedC := &Cache{ ByOptionsLister: i, + gvk: expectedGVK, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { // we can't test func == func, so instead we check if the output was as expected @@ -402,7 +458,7 @@ func TestCacheFor(t *testing.T) { go func() { // this function ensures that ctx is not canceled for the duration of this test but if part of a longer process it will be closed eventually time.Sleep(5 * time.Second) - f.Stop() + f.Stop(expectedGVK) }() var c *Cache var err error @@ -430,6 +486,7 @@ func TestCacheFor(t *testing.T) { } expectedC := &Cache{ ByOptionsLister: i, + gvk: expectedGVK, } testNewInformer := func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, namespaced bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error) { assert.Equal(t, client, dynamicClient) @@ -454,7 +511,7 @@ func TestCacheFor(t *testing.T) { go func() { time.Sleep(10 * time.Second) - f.Stop() + f.Stop(expectedGVK) }() // CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) c, err := f.CacheFor(context.Background(), fields, nil, nil, nil, dynamicClient, expectedGVK, false, true) diff --git a/pkg/sqlcache/integration_test.go b/pkg/sqlcache/integration_test.go index 33b91fba..a171c762 100644 --- a/pkg/sqlcache/integration_test.go +++ b/pkg/sqlcache/integration_test.go @@ -97,7 +97,7 @@ func (i *IntegrationSuite) TestSQLCacheFilters() { require.NoError(err) defer func() { cacheFactory.DoneWithCache(cache) - cacheFactory.Stop() + cacheFactory.Stop(cache.GVK()) }() // doesn't match the filter for somekey == somevalue diff --git a/pkg/sqlcache/schematracker/schema.go b/pkg/sqlcache/schematracker/schema.go new file mode 100644 index 00000000..7f1bc8ff --- /dev/null +++ b/pkg/sqlcache/schematracker/schema.go @@ -0,0 +1,78 @@ +package schematracker + +import ( + "errors" + "slices" + + "github.com/rancher/steve/pkg/attributes" + "github.com/rancher/steve/pkg/resources/common" + "github.com/rancher/steve/pkg/schema" + k8sschema "k8s.io/apimachinery/pkg/runtime/schema" +) + +type Resetter interface { + Reset(k8sschema.GroupVersionKind) error +} + +type SchemaTracker struct { + knownSchemas map[k8sschema.GroupVersionKind][]common.ColumnDefinition + resetter Resetter +} + +func NewSchemaTracker(resetter Resetter) *SchemaTracker { + return &SchemaTracker{ + knownSchemas: make(map[k8sschema.GroupVersionKind][]common.ColumnDefinition), + resetter: resetter, + } +} + +func (s *SchemaTracker) OnSchemas(schemas *schema.Collection) error { + knownSchemas := make(map[k8sschema.GroupVersionKind][]common.ColumnDefinition) + + needsReset := make(map[k8sschema.GroupVersionKind]struct{}) + + deletedSchemas := make(map[k8sschema.GroupVersionKind]struct{}) + for gvk := range s.knownSchemas { + deletedSchemas[gvk] = struct{}{} + } + + for _, id := range schemas.IDs() { + theSchema := schemas.Schema(id) + if theSchema == nil { + continue + } + + gvk := attributes.GVK(theSchema) + + cols := common.GetColumnDefinitions(theSchema) + + knownSchemas[gvk] = cols + + oldCols, exists := s.knownSchemas[gvk] + if exists { + if !slices.Equal(cols, oldCols) { + needsReset[gvk] = struct{}{} + } + } else { + needsReset[gvk] = struct{}{} + } + + // Schema is still there so it hasn't been deleted + delete(deletedSchemas, gvk) + } + + // All deleted schemas must be resetted as well + for gvk := range deletedSchemas { + needsReset[gvk] = struct{}{} + } + + // Reset known schemas + var retErr error + for gvk := range needsReset { + err := s.resetter.Reset(gvk) + retErr = errors.Join(retErr, err) + } + + s.knownSchemas = knownSchemas + return retErr +} diff --git a/pkg/sqlcache/schematracker/schema_test.go b/pkg/sqlcache/schematracker/schema_test.go new file mode 100644 index 00000000..4e26125a --- /dev/null +++ b/pkg/sqlcache/schematracker/schema_test.go @@ -0,0 +1,231 @@ +package schematracker + +import ( + "context" + "testing" + + "github.com/rancher/apiserver/pkg/types" + "github.com/rancher/steve/pkg/attributes" + "github.com/rancher/steve/pkg/resources/common" + "github.com/rancher/steve/pkg/schema" + "github.com/rancher/wrangler/v3/pkg/schemas" + "github.com/stretchr/testify/assert" + k8sschema "k8s.io/apimachinery/pkg/runtime/schema" +) + +type testResetter struct { + Resets map[k8sschema.GroupVersionKind]struct{} +} + +func (r *testResetter) Reset(gvk k8sschema.GroupVersionKind) error { + if r.Resets == nil { + r.Resets = make(map[k8sschema.GroupVersionKind]struct{}) + } + r.Resets[gvk] = struct{}{} + return nil +} + +func TestSchemaTracker(t *testing.T) { + pods := &types.APISchema{ + Schema: &schemas.Schema{ID: "pods"}, + } + attributes.SetGVK(pods, k8sschema.GroupVersionKind{ + Version: "v1", + Kind: "Pod", + }) + attributes.SetGVR(pods, k8sschema.GroupVersionResource{ + Version: "v1", + Resource: "pods", + }) + + configmaps := &types.APISchema{ + Schema: &schemas.Schema{ID: "configmaps"}, + } + attributes.SetGVK(configmaps, k8sschema.GroupVersionKind{ + Version: "v1", + Kind: "ConfigMap", + }) + attributes.SetGVR(configmaps, k8sschema.GroupVersionResource{ + Version: "v1", + Resource: "configmaps", + }) + + foo := &types.APISchema{ + Schema: &schemas.Schema{ID: "test.io.foos"}, + } + attributes.SetGVK(foo, k8sschema.GroupVersionKind{ + Group: "test.io", + Version: "v1", + Kind: "Foo", + }) + attributes.SetGVR(foo, k8sschema.GroupVersionResource{ + Group: "test.io", + Version: "v1", + Resource: "foos", + }) + + foos1 := &types.APISchema{ + Schema: &schemas.Schema{ID: "test.io.foos"}, + } + attributes.SetGVK(foos1, k8sschema.GroupVersionKind{ + Group: "test.io", + Version: "v1", + Kind: "Foo", + }) + attributes.SetGVR(foos1, k8sschema.GroupVersionResource{ + Group: "test.io", + Version: "v1", + Resource: "foos", + }) + attributes.SetColumns(foos1, []common.ColumnDefinition{ + {Field: "field1"}, {Field: "field2"}, + }) + + foos2 := &types.APISchema{ + Schema: &schemas.Schema{ID: "test.io.foos"}, + } + attributes.SetGVK(foos2, k8sschema.GroupVersionKind{ + Group: "test.io", + Version: "v1", + Kind: "Foo", + }) + attributes.SetGVR(foos2, k8sschema.GroupVersionResource{ + Group: "test.io", + Version: "v1", + Resource: "foos", + }) + attributes.SetColumns(foos2, []common.ColumnDefinition{ + {Field: "field1"}, {Field: "field2"}, {Field: "field3"}, + }) + + bars := &types.APISchema{ + Schema: &schemas.Schema{ID: "test.io.bars"}, + } + attributes.SetGVK(bars, k8sschema.GroupVersionKind{ + Group: "test.io", + Version: "v1", + Kind: "Bar", + }) + attributes.SetGVR(bars, k8sschema.GroupVersionResource{ + Group: "test.io", + Version: "v1", + Resource: "bars", + }) + + tests := []struct { + name string + initialSchemas map[string]*types.APISchema + refreshedSchemas map[string]*types.APISchema + expectedResets map[k8sschema.GroupVersionKind]struct{} + }{ + { + name: "no change", + initialSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + }, + refreshedSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + }, + }, + { + name: "single schema added", + initialSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + }, + refreshedSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + "pods": pods, + }, + expectedResets: map[k8sschema.GroupVersionKind]struct{}{ + attributes.GVK(pods): {}, + }, + }, + { + name: "multiple schemas added", + initialSchemas: map[string]*types.APISchema{}, + refreshedSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + "pods": pods, + }, + expectedResets: map[k8sschema.GroupVersionKind]struct{}{ + attributes.GVK(configmaps): {}, + attributes.GVK(pods): {}, + }, + }, + { + name: "single schema removed", + initialSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + "pods": pods, + }, + refreshedSchemas: map[string]*types.APISchema{ + "pods": pods, + }, + expectedResets: map[k8sschema.GroupVersionKind]struct{}{ + attributes.GVK(configmaps): {}, + }, + }, + { + name: "multiple schemas removed", + initialSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + "pods": pods, + }, + refreshedSchemas: map[string]*types.APISchema{}, + expectedResets: map[k8sschema.GroupVersionKind]struct{}{ + attributes.GVK(configmaps): {}, + attributes.GVK(pods): {}, + }, + }, + { + name: "field changed", + initialSchemas: map[string]*types.APISchema{ + "test.io.foos": foos1, + }, + refreshedSchemas: map[string]*types.APISchema{ + "test.io.foos": foos2, + }, + expectedResets: map[k8sschema.GroupVersionKind]struct{}{ + attributes.GVK(foos2): {}, + }, + }, + { + name: "added deleted and changed", + initialSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + "pods": pods, + "test.io.foos": foos1, + }, + refreshedSchemas: map[string]*types.APISchema{ + "configmaps": configmaps, + "test.io.bars": bars, + "test.io.foos": foos2, + }, + expectedResets: map[k8sschema.GroupVersionKind]struct{}{ + attributes.GVK(foos2): {}, + attributes.GVK(pods): {}, + attributes.GVK(bars): {}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + resetter := &testResetter{} + tracker := NewSchemaTracker(resetter) + collection := schema.NewCollection(context.TODO(), types.EmptyAPISchemas(), nil) + + collection.Reset(test.initialSchemas) + err := tracker.OnSchemas(collection) + assert.NoError(t, err) + + // Reset because we don't care about the initial list of resets + resetter.Resets = nil + + collection.Reset(test.refreshedSchemas) + err = tracker.OnSchemas(collection) + assert.NoError(t, err) + + assert.Equal(t, test.expectedResets, resetter.Resets) + }) + } +} diff --git a/pkg/stores/sqlproxy/proxy_mocks_test.go b/pkg/stores/sqlproxy/proxy_mocks_test.go index 7c8ce5b9..9c063528 100644 --- a/pkg/stores/sqlproxy/proxy_mocks_test.go +++ b/pkg/stores/sqlproxy/proxy_mocks_test.go @@ -294,17 +294,17 @@ func (mr *MockCacheFactoryMockRecorder) DoneWithCache(arg0 any) *gomock.Call { } // Stop mocks base method. -func (m *MockCacheFactory) Stop() error { +func (m *MockCacheFactory) Stop(gvk schema.GroupVersionKind) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Stop") + ret := m.ctrl.Call(m, "Stop", gvk) ret0, _ := ret[0].(error) return ret0 } // Stop indicates an expected call of Stop. -func (mr *MockCacheFactoryMockRecorder) Stop() *gomock.Call { +func (mr *MockCacheFactoryMockRecorder) Stop(gvk any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockCacheFactory)(nil).Stop)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockCacheFactory)(nil).Stop), gvk) } // MockSchemaColumnSetter is a mock of SchemaColumnSetter interface. diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index 7160c4b1..b6ffad67 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -302,7 +302,7 @@ type CacheFactoryInitializer func() (CacheFactory, error) type CacheFactory interface { CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, namespaced bool, watchable bool) (*factory.Cache, error) DoneWithCache(*factory.Cache) - Stop() error + Stop(gvk schema.GroupVersionKind) error } // NewProxyStore returns a Store implemented directly on top of kubernetes. @@ -334,18 +334,20 @@ func NewProxyStore(ctx context.Context, c SchemaColumnSetter, clientGetter Clien } // Reset locks the store, resets the underlying cache factory, and warm the namespace cache. -func (s *Store) Reset() error { +func (s *Store) Reset(gvk schema.GroupVersionKind) error { s.lock.Lock() defer s.lock.Unlock() - if s.namespaceCache != nil { + if s.namespaceCache != nil && gvk == namespaceGVK { s.cacheFactory.DoneWithCache(s.namespaceCache) } - if err := s.cacheFactory.Stop(); err != nil { + if err := s.cacheFactory.Stop(gvk); err != nil { return fmt.Errorf("reset: %w", err) } - if err := s.initializeNamespaceCache(); err != nil { - return err + if gvk == namespaceGVK { + if err := s.initializeNamespaceCache(); err != nil { + return err + } } return nil } diff --git a/pkg/stores/sqlproxy/proxy_store_test.go b/pkg/stores/sqlproxy/proxy_store_test.go index 95540aa9..1f5cc563 100644 --- a/pkg/stores/sqlproxy/proxy_store_test.go +++ b/pkg/stores/sqlproxy/proxy_store_test.go @@ -33,6 +33,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" schema2 "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/authentication/user" @@ -726,13 +727,14 @@ func TestReset(t *testing.T) { transformBuilder: tb, } nsSchema := baseNSSchema - cf.EXPECT().Stop().Return(nil) + gvk := attributes.GVK(&nsSchema) + cf.EXPECT().Stop(gvk).Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nsc, nil) cf.EXPECT().DoneWithCache(nsc) - tb.EXPECT().GetTransformFunc(attributes.GVK(&nsSchema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) - err := s.Reset() + tb.EXPECT().GetTransformFunc(gvk, gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) + err := s.Reset(gvk) assert.Nil(t, err) assert.Equal(t, nsc, s.namespaceCache) }, @@ -754,8 +756,9 @@ func TestReset(t *testing.T) { transformBuilder: tb, } - cf.EXPECT().Stop().Return(fmt.Errorf("error")) - err := s.Reset() + gvk := schema.GroupVersionKind{} + cf.EXPECT().Stop(gvk).Return(fmt.Errorf("error")) + err := s.Reset(gvk) assert.NotNil(t, err) }, }) @@ -776,9 +779,12 @@ func TestReset(t *testing.T) { transformBuilder: tb, } - cf.EXPECT().Stop().Return(nil) + nsSchema := baseNSSchema + gvk := attributes.GVK(&nsSchema) + + cf.EXPECT().Stop(gvk).Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(fmt.Errorf("error")) - err := s.Reset() + err := s.Reset(gvk) assert.NotNil(t, err) }, }) @@ -799,11 +805,12 @@ func TestReset(t *testing.T) { transformBuilder: tb, } nsSchema := baseNSSchema + gvk := attributes.GVK(&nsSchema) - cf.EXPECT().Stop().Return(nil) + cf.EXPECT().Stop(gvk).Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(nil, fmt.Errorf("error")) - err := s.Reset() + err := s.Reset(gvk) assert.NotNil(t, err) }, }) @@ -825,13 +832,14 @@ func TestReset(t *testing.T) { transformBuilder: tb, } nsSchema := baseNSSchema + gvk := attributes.GVK(&nsSchema) - cf.EXPECT().Stop().Return(nil) + cf.EXPECT().Stop(gvk).Return(nil) cs.EXPECT().SetColumns(gomock.Any(), gomock.Any()).Return(nil) cg.EXPECT().TableAdminClient(nil, &nsSchema, "", &WarningBuffer{}).Return(ri, nil) cf.EXPECT().CacheFor(context.Background(), [][]string{{`id`}, {`metadata`, `state`, `name`}, {"spec", "displayName"}}, gomock.Any(), gomock.Any(), gomock.Any(), &tablelistconvert.Client{ResourceInterface: ri}, attributes.GVK(&nsSchema), false, true).Return(nil, fmt.Errorf("error")) - tb.EXPECT().GetTransformFunc(attributes.GVK(&nsSchema), gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) - err := s.Reset() + tb.EXPECT().GetTransformFunc(gvk, gomock.Any(), false).Return(func(obj interface{}) (interface{}, error) { return obj, nil }) + err := s.Reset(gvk) assert.NotNil(t, err) }, })