mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #25671 from deads2k/fix-add-indexer
make addIndexers safe for sharedInformer
This commit is contained in:
commit
6dc1437015
4
pkg/client/cache/index.go
vendored
4
pkg/client/cache/index.go
vendored
@ -34,6 +34,10 @@ type Indexer interface {
|
||||
ByIndex(indexName, indexKey string) ([]interface{}, error)
|
||||
// GetIndexer return the indexers
|
||||
GetIndexers() Indexers
|
||||
|
||||
// AddIndexers adds more indexers to this store. If you call this after you already have data
|
||||
// in the store, the results are undefined.
|
||||
AddIndexers(newIndexers Indexers) error
|
||||
}
|
||||
|
||||
// IndexFunc knows how to provide an indexed value for an object.
|
||||
|
4
pkg/client/cache/store.go
vendored
4
pkg/client/cache/store.go
vendored
@ -180,6 +180,10 @@ func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
|
||||
return c.cacheStorage.ByIndex(indexName, indexKey)
|
||||
}
|
||||
|
||||
func (c *cache) AddIndexers(newIndexers Indexers) error {
|
||||
return c.cacheStorage.AddIndexers(newIndexers)
|
||||
}
|
||||
|
||||
// Get returns the requested item, or sets exists=false.
|
||||
// Get is completely threadsafe as long as you treat all items as immutable.
|
||||
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
|
||||
|
25
pkg/client/cache/thread_safe_store.go
vendored
25
pkg/client/cache/thread_safe_store.go
vendored
@ -46,6 +46,10 @@ type ThreadSafeStore interface {
|
||||
ListIndexFuncValues(name string) []string
|
||||
ByIndex(indexName, indexKey string) ([]interface{}, error)
|
||||
GetIndexers() Indexers
|
||||
|
||||
// AddIndexers adds more indexers to this store. If you call this after you already have data
|
||||
// in the store, the results are undefined.
|
||||
AddIndexers(newIndexers Indexers) error
|
||||
}
|
||||
|
||||
// threadSafeMap implements ThreadSafeStore
|
||||
@ -195,6 +199,27 @@ func (c *threadSafeMap) GetIndexers() Indexers {
|
||||
return c.indexers
|
||||
}
|
||||
|
||||
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
if len(c.items) > 0 {
|
||||
return fmt.Errorf("cannot add indexers to running index")
|
||||
}
|
||||
|
||||
oldKeys := sets.StringKeySet(c.indexers)
|
||||
newKeys := sets.StringKeySet(newIndexers)
|
||||
|
||||
if oldKeys.HasAny(newKeys.List()...) {
|
||||
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
|
||||
}
|
||||
|
||||
for k, v := range newIndexers {
|
||||
c.indexers[k] = v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
|
||||
// updateIndices must be called from a function that already has a lock on the cache
|
||||
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
|
||||
|
@ -57,14 +57,7 @@ type SharedIndexInformer interface {
|
||||
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
|
||||
// be shared amongst all consumers.
|
||||
func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
|
||||
sharedInformer := &sharedIndexInformer{
|
||||
processor: &sharedProcessor{},
|
||||
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}),
|
||||
listerWatcher: lw,
|
||||
objectType: objType,
|
||||
fullResyncPeriod: resyncPeriod,
|
||||
}
|
||||
return sharedInformer
|
||||
return NewSharedIndexInformer(lw, objType, resyncPeriod, cache.Indexers{})
|
||||
}
|
||||
|
||||
// NewSharedIndexInformer creates a new instance for the listwatcher.
|
||||
@ -184,17 +177,7 @@ func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error {
|
||||
return fmt.Errorf("informer has already started")
|
||||
}
|
||||
|
||||
oldIndexers := s.indexer.GetIndexers()
|
||||
|
||||
for name, indexFunc := range oldIndexers {
|
||||
if _, exist := indexers[name]; exist {
|
||||
return fmt.Errorf("there is an index named %s already exist", name)
|
||||
}
|
||||
indexers[name] = indexFunc
|
||||
}
|
||||
|
||||
s.indexer = cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
|
||||
return nil
|
||||
return s.indexer.AddIndexers(indexers)
|
||||
}
|
||||
|
||||
func (s *sharedIndexInformer) GetController() ControllerInterface {
|
||||
|
Loading…
Reference in New Issue
Block a user