diff --git a/pkg/client/cache/index.go b/pkg/client/cache/index.go index 19c5e0650d7..572f2c06b69 100644 --- a/pkg/client/cache/index.go +++ b/pkg/client/cache/index.go @@ -34,6 +34,10 @@ type Indexer interface { ByIndex(indexName, indexKey string) ([]interface{}, error) // GetIndexer return the indexers GetIndexers() Indexers + + // AddIndexers adds more indexers to this store. If you call this after you already have data + // in the store, the results are undefined. + AddIndexers(newIndexers Indexers) error } // IndexFunc knows how to provide an indexed value for an object. diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index b7f6f54cebd..a3b7c92ddc5 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -180,6 +180,10 @@ func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) { return c.cacheStorage.ByIndex(indexName, indexKey) } +func (c *cache) AddIndexers(newIndexers Indexers) error { + return c.cacheStorage.AddIndexers(newIndexers) +} + // Get returns the requested item, or sets exists=false. // Get is completely threadsafe as long as you treat all items as immutable. func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) { diff --git a/pkg/client/cache/thread_safe_store.go b/pkg/client/cache/thread_safe_store.go index b9c6e25b264..6cab6861a85 100644 --- a/pkg/client/cache/thread_safe_store.go +++ b/pkg/client/cache/thread_safe_store.go @@ -46,6 +46,10 @@ type ThreadSafeStore interface { ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers + + // AddIndexers adds more indexers to this store. If you call this after you already have data + // in the store, the results are undefined. + AddIndexers(newIndexers Indexers) error } // threadSafeMap implements ThreadSafeStore @@ -195,6 +199,27 @@ func (c *threadSafeMap) GetIndexers() Indexers { return c.indexers } +func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error { + c.lock.Lock() + defer c.lock.Unlock() + + if len(c.items) > 0 { + return fmt.Errorf("cannot add indexers to running index") + } + + oldKeys := sets.StringKeySet(c.indexers) + newKeys := sets.StringKeySet(newIndexers) + + if oldKeys.HasAny(newKeys.List()...) { + return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys)) + } + + for k, v := range newIndexers { + c.indexers[k] = v + } + return nil +} + // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj // updateIndices must be called from a function that already has a lock on the cache func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error { diff --git a/pkg/controller/framework/shared_informer.go b/pkg/controller/framework/shared_informer.go index 59f641b9814..ce9ddf2c714 100644 --- a/pkg/controller/framework/shared_informer.go +++ b/pkg/controller/framework/shared_informer.go @@ -57,14 +57,7 @@ type SharedIndexInformer interface { // TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can // be shared amongst all consumers. func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { - sharedInformer := &sharedIndexInformer{ - processor: &sharedProcessor{}, - indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}), - listerWatcher: lw, - objectType: objType, - fullResyncPeriod: resyncPeriod, - } - return sharedInformer + return NewSharedIndexInformer(lw, objType, resyncPeriod, cache.Indexers{}) } // NewSharedIndexInformer creates a new instance for the listwatcher. @@ -184,17 +177,7 @@ func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error { return fmt.Errorf("informer has already started") } - oldIndexers := s.indexer.GetIndexers() - - for name, indexFunc := range oldIndexers { - if _, exist := indexers[name]; exist { - return fmt.Errorf("there is an index named %s already exist", name) - } - indexers[name] = indexFunc - } - - s.indexer = cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) - return nil + return s.indexer.AddIndexers(indexers) } func (s *sharedIndexInformer) GetController() ControllerInterface {