Updated comments on internal abstractions in client-go/tools/cache

The comments on Store and Queue and the FIFOs ceased being accurate
long ago.

Kubernetes-commit: 7a7ccb797e0c977c8049c1490f5a1f92f0bfbbb2
This commit is contained in:
Mike Spreitzer 2020-01-02 01:53:17 -08:00 committed by Kubernetes Publisher
parent 7ec8a74ae9
commit c26559b124
6 changed files with 115 additions and 62 deletions

View File

@ -26,7 +26,7 @@ import (
"k8s.io/klog" "k8s.io/klog"
) )
// NewDeltaFIFO returns a Store which can be used process changes to items. // NewDeltaFIFO returns a Queue which can be used to process changes to items.
// //
// keyFunc is used to figure out what key an object should have. (It's // keyFunc is used to figure out what key an object should have. (It's
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
@ -67,7 +67,9 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return f return f
} }
// DeltaFIFO is like FIFO, but allows you to process deletes. // DeltaFIFO is like FIFO, but allows the PopProcessFunc to process
// deletes. The accumulator associated with a given object's key is a
// slice of Delta values for that object.
// //
// DeltaFIFO is a producer-consumer queue, where a Reflector is // DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls // intended to be the producer, and the consumer is whatever calls
@ -77,22 +79,25 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
// * You want to process every object change (delta) at most once. // * You want to process every object change (delta) at most once.
// * When you process an object, you want to see everything // * When you process an object, you want to see everything
// that's happened to it since you last processed it. // that's happened to it since you last processed it.
// * You want to process the deletion of objects. // * You want to process the deletion of some of the objects.
// * You might want to periodically reprocess objects. // * You might want to periodically reprocess objects.
// //
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it // interface{} to satisfy the Store/Queue interfaces, but it
// will always return an object of type Deltas. // will always return an object of type Deltas.
// //
// A DeltaFIFO's knownObjects KeyListerGetter provides get/list access
// to a set of "known objects" that is used for two purposes. One is
// to conditionalize delete operations: it is only for a known object
// that a Delete Delta is recorded (this applies to both Delete and
// Replace). The deleted object will be included in the
// DeleteFinalStateUnknown markers, and those objects could be stale.
// The other purpose is in the Resync operation, which adds a Sync
// Delta for every known object.
//
// A note on threading: If you call Pop() in parallel from multiple // A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly // threads, you could end up with multiple threads processing slightly
// different versions of the same object. // different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
type DeltaFIFO struct { type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'. // lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex lock sync.RWMutex
@ -187,7 +192,7 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
// Delete is just like Add, but makes an Deleted Delta. If the item does not // Delete is just like Add, but makes an Deleted Delta. If the item does not
// already exist, it will be ignored. (It may have already been deleted by a // already exist, it will be ignored. (It may have already been deleted by a
// Replace (re-list), for example. // Replace (re-list), for example.)
func (f *DeltaFIFO) Delete(obj interface{}) error { func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj) id, err := f.KeyOf(obj)
if err != nil { if err != nil {
@ -313,6 +318,9 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
f.items[id] = newDeltas f.items[id] = newDeltas
f.cond.Broadcast() f.cond.Broadcast()
} else { } else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are // We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map). // ignored if they are not in the map).
delete(f.items, id) delete(f.items, id)

56
tools/cache/fifo.go vendored
View File

@ -24,7 +24,7 @@ import (
) )
// PopProcessFunc is passed to Pop() method of Queue interface. // PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the element popped from the queue. // It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error type PopProcessFunc func(interface{}) error
// ErrRequeue may be returned by a PopProcessFunc to safely requeue // ErrRequeue may be returned by a PopProcessFunc to safely requeue
@ -44,26 +44,38 @@ func (e ErrRequeue) Error() string {
return e.Err.Error() return e.Err.Error()
} }
// Queue is exactly like a Store, but has a Pop() method too. // Queue extends Store with a collection of keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface { type Queue interface {
Store Store
// Pop blocks until it has something to process. // Pop blocks until there is at least one key to process or the
// It returns the object that was process and the result of processing. // Queue is closed. In the latter case Pop returns with an error.
// The PopProcessFunc may return an ErrRequeue{...} to indicate the item // In the former case Pop atomically picks one key to process,
// should be requeued before releasing the lock on the queue. // removes that (key, accumulator) association from the Store, and
// processes the accumulator. Pop returns the accumulator that
// was processed and the result of processing. The PopProcessFunc
// may return an ErrRequeue{inner} and in this case Pop will (a)
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
Pop(PopProcessFunc) (interface{}, error) Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent adds a value previously // AddIfNotPresent puts the given accumulator into the Queue (in
// returned by Pop back into the queue as long // association with the accumulator's key) if and only if that key
// as nothing else (presumably more recent) // is not already associated with a non-empty accumulator.
// has since been added.
AddIfNotPresent(interface{}) error AddIfNotPresent(interface{}) error
// HasSynced returns true if the first batch of items has been popped // HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, Update, or Delete;
// otherwise the first batch is empty.
HasSynced() bool HasSynced() bool
// Close queue // Close the queue
Close() Close()
} }
@ -79,11 +91,16 @@ func Pop(queue Queue) interface{} {
return result return result
} }
// FIFO receives adds and updates from a Reflector, and puts them in a queue for // FIFO is a Queue in which (a) each accumulator is simply the most
// FIFO order processing. If multiple adds/updates of a single item happen while // recently provided object and (b) the collection of keys to process
// an item is in the queue before it has been processed, it will only be // is a FIFO. The accumulators all start out empty, and deleting an
// processed once, and when it is processed, the most recent version will be // object from its accumulator empties the accumulator. The Resync
// processed. This can't be done with a channel. // operation is a no-op.
//
// Thus: if multiple adds/updates of a single object happen while that
// object's key is in the queue before it has been processed then it
// will only be processed once, and when it is processed the most
// recent version will be processed. This can't be done with a channel
// //
// FIFO solves this use case: // FIFO solves this use case:
// * You want to process every object (exactly) once. // * You want to process every object (exactly) once.
@ -94,7 +111,7 @@ func Pop(queue Queue) interface{} {
type FIFO struct { type FIFO struct {
lock sync.RWMutex lock sync.RWMutex
cond sync.Cond cond sync.Cond
// We depend on the property that items in the set are in the queue and vice versa. // We depend on the property that every key in `items` is also in `queue`
items map[string]interface{} items map[string]interface{}
queue []string queue []string
@ -326,7 +343,8 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
return nil return nil
} }
// Resync will touch all objects to put them into the processing queue // Resync will ensure that every object in the Store has its key in the queue.
// This should be a no-op, because that property is maintained by all operations.
func (f *FIFO) Resync() error { func (f *FIFO) Resync() error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()

View File

@ -36,9 +36,12 @@ func init() {
mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR")) mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
} }
// MutationDetector is able to monitor if the object be modified outside. // MutationDetector is able to monitor objects for mutation within a limited window of time
type MutationDetector interface { type MutationDetector interface {
// AddObject adds the given object to the set being monitored for a while from now
AddObject(obj interface{}) AddObject(obj interface{})
// Run starts the monitoring and does not return until the monitoring is stopped.
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
} }

View File

@ -168,21 +168,21 @@ type SharedIndexInformer interface {
} }
// NewSharedInformer creates a new instance for the listwatcher. // NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { func NewSharedInformer(lw ListerWatcher, objOfWatchedType runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{}) return NewSharedIndexInformer(lw, objOfWatchedType, resyncPeriod, Indexers{})
} }
// NewSharedIndexInformer creates a new instance for the listwatcher. // NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { func NewSharedIndexInformer(lw ListerWatcher, objOfWatchedType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{} realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{ sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock}, processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw, listerWatcher: lw,
objectType: objType, objectType: objOfWatchedType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod, resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objOfWatchedType)),
clock: realClock, clock: realClock,
} }
return sharedIndexInformer return sharedIndexInformer
@ -244,7 +244,6 @@ type sharedIndexInformer struct {
processor *sharedProcessor processor *sharedProcessor
cacheMutationDetector MutationDetector cacheMutationDetector MutationDetector
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher listerWatcher ListerWatcher
objectType runtime.Object objectType runtime.Object

39
tools/cache/store.go vendored
View File

@ -23,27 +23,52 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
) )
// Store is a generic object storage interface. Reflector knows how to watch a server // Store is a generic object storage and processing interface. A
// and update a store. A generic store is provided, which allows Reflector to be used // Store holds a map from string keys to accumulators, and has
// as a local caching system, and an LRU store, which allows Reflector to work like a // operations to add, update, and delete a given object to/from the
// queue of items yet to be processed. // accumulator currently associated with a given key. A Store also
// knows how to extract the key from a given object, so many operations
// are given only the object.
// //
// Store makes no assumptions about stored object identity; it is the responsibility // In the simplest Store implementations each accumulator is simply
// of a Store implementation to provide a mechanism to correctly key objects and to // the last given object and thus the Store's behavior is simple
// define the contract for obtaining objects by some arbitrary key type. // storage.
//
// Reflector knows how to watch a server and update a Store. This
// package provides a variety of implementations of Store.
type Store interface { type Store interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error Delete(obj interface{}) error
// List returns a list of all the currently non-empty accumulators
List() []interface{} List() []interface{}
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string ListKeys() []string
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error) Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the // Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference // given list. Store takes ownership of the list, you should not reference
// it after calling this function. // it after calling this function.
Replace([]interface{}, string) error Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior. In general the idea is to tee up the
// current non-empty accumulators or their keys for
// reconsideration (whatever that means).
Resync() error Resync() error
} }

View File

@ -131,8 +131,8 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
} }
} }
// Index returns a list of items that match on the index function // Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable // Index is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
@ -142,37 +142,37 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
return nil, fmt.Errorf("Index with name %s does not exist", indexName) return nil, fmt.Errorf("Index with name %s does not exist", indexName)
} }
indexKeys, err := indexFunc(obj) indexedValues, err := indexFunc(obj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
index := c.indices[indexName] index := c.indices[indexName]
var returnKeySet sets.String var storeKeySet sets.String
if len(indexKeys) == 1 { if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching. // In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here. // Optimize the most common path - deduping is not needed here.
returnKeySet = index[indexKeys[0]] storeKeySet = index[indexedValues[0]]
} else { } else {
// Need to de-dupe the return list. // Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen. // Since multiple keys are allowed, this can happen.
returnKeySet = sets.String{} storeKeySet = sets.String{}
for _, indexKey := range indexKeys { for _, indexedValue := range indexedValues {
for key := range index[indexKey] { for key := range index[indexedValue] {
returnKeySet.Insert(key) storeKeySet.Insert(key)
} }
} }
} }
list := make([]interface{}, 0, returnKeySet.Len()) list := make([]interface{}, 0, storeKeySet.Len())
for absoluteKey := range returnKeySet { for storeKey := range storeKeySet {
list = append(list, c.items[absoluteKey]) list = append(list, c.items[storeKey])
} }
return list, nil return list, nil
} }
// ByIndex returns a list of items that match an exact value on the index function // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
@ -183,7 +183,7 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
index := c.indices[indexName] index := c.indices[indexName]
set := index[indexKey] set := index[indexedValue]
list := make([]interface{}, 0, set.Len()) list := make([]interface{}, 0, set.Len())
for key := range set { for key := range set {
list = append(list, c.items[key]) list = append(list, c.items[key])
@ -192,9 +192,9 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
return list, nil return list, nil
} }
// IndexKeys returns a list of keys that match on the index function. // IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
// IndexKeys is thread-safe so long as you treat all items as immutable. // IndexKeys is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) { func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
@ -205,7 +205,7 @@ func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error)
index := c.indices[indexName] index := c.indices[indexName]
set := index[indexKey] set := index[indexedValue]
return set.List(), nil return set.List(), nil
} }