Merge pull request #86774 from MikeSpreitzer/more-informer-updates

Cleanup comments and names in client-go/tools/cache

Kubernetes-commit: 083f58a1e4684d207a7a4858e9a25c2e120bbc20
This commit is contained in:
Kubernetes Publisher 2020-01-14 17:21:04 -08:00
commit ded4aaa1cc
11 changed files with 342 additions and 138 deletions

View File

@ -26,7 +26,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
) )
// Config contains all the settings for a Controller. // This file implements a low-level controller that is used in
// sharedIndexInformer, which is an implementation of
// SharedIndexInformer. Such informers, in turn, are key components
// in the high level controllers that form the backbone of the
// Kubernetes control plane. Look at those for examples, or the
// example in
// https://github.com/kubernetes/client-go/tree/master/examples/workqueue
// .
// Config contains all the settings for one of these low-level controllers.
type Config struct { type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to // The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function // assumptions in the implementation. Your Process() function
@ -36,30 +45,29 @@ type Config struct {
// Something that can list and watch your objects. // Something that can list and watch your objects.
ListerWatcher ListerWatcher
// Something that can process your objects. // Something that can process a popped Deltas.
Process ProcessFunc Process ProcessFunc
// The type of your objects. // ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
ObjectType runtime.Object ObjectType runtime.Object
// Reprocess everything at least this often. // FullResyncPeriod is the period at which ShouldResync is considered.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
FullResyncPeriod time.Duration FullResyncPeriod time.Duration
// ShouldResync, if specified, is invoked when the controller's reflector determines the next // ShouldResync is periodically used by the reflector to determine
// periodic sync should occur. If this returns true, it means the reflector should proceed with // whether to Resync the Queue. If ShouldResync is `nil` or
// the resync. // returns true, it means the reflector should proceed with the
// resync.
ShouldResync ShouldResyncFunc ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object. // If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop // TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in // the object completely if desired. Pass the object in
// question to this interface as a parameter. // question to this interface as a parameter. This is probably moot
// now that this functionality appears at a higher level.
RetryOnError bool RetryOnError bool
} }
@ -71,7 +79,7 @@ type ShouldResyncFunc func() bool
// ProcessFunc processes a single object. // ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error type ProcessFunc func(obj interface{}) error
// Controller is a generic controller framework. // `*controller` implements Controller
type controller struct { type controller struct {
config Config config Config
reflector *Reflector reflector *Reflector
@ -79,10 +87,22 @@ type controller struct {
clock clock.Clock clock clock.Clock
} }
// Controller is a generic controller framework. // Controller is a low-level controller that is parameterized by a
// Config and used in sharedIndexInformer.
type Controller interface { type Controller interface {
// Run does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
// HasSynced delegates to the Config's Queue
HasSynced() bool HasSynced() bool
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion() string LastSyncResourceVersion() string
} }
@ -95,7 +115,7 @@ func New(c *Config) Controller {
return ctlr return ctlr
} }
// Run begins processing items, and will continue until a value is sent down stopCh. // Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once. // It's an error to call Run more than once.
// Run blocks; call via go. // Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(stopCh <-chan struct{}) {

View File

@ -26,15 +26,15 @@ import (
"k8s.io/klog" "k8s.io/klog"
) )
// NewDeltaFIFO returns a Store which can be used process changes to items. // NewDeltaFIFO returns a Queue which can be used to process changes to items.
// //
// keyFunc is used to figure out what key an object should have. (It's // keyFunc is used to figure out what key an object should have. (It is
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
// //
// 'keyLister' is expected to return a list of keys that the consumer of // 'knownObjects' may be supplied to modify the behavior of Delete,
// this queue "knows about". It is used to decide which items are missing // Replace, and Resync. It may be nil if you do not need those
// when Replace() is called; 'Deleted' deltas are produced for these items. // modifications.
// It may be nil if you don't need to detect all deletions. //
// TODO: consider merging keyLister with this object, tracking a list of // TODO: consider merging keyLister with this object, tracking a list of
// "known" keys when Pop() is called. Have to think about how that // "known" keys when Pop() is called. Have to think about how that
// affects error retrying. // affects error retrying.
@ -67,7 +67,18 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return f return f
} }
// DeltaFIFO is like FIFO, but allows you to process deletes. // DeltaFIFO is like FIFO, but differs in two ways. One is that the
// accumulator associated with a given object's key is not that object
// but rather a Deltas, which is a slice of Delta values for that
// object. Applying an object to a Deltas means to append a Delta
// except when the potentially appended Delta is a Deleted and the
// Deltas already ends with a Deleted. In that case the Deltas does
// not grow, although the terminal Deleted will be replaced by the new
// Deleted if the older Deleted's object is a
// DeletedFinalStateUnknown.
//
// The other difference is that DeltaFIFO has an additional way that
// an object can be applied to an accumulator, called Sync.
// //
// DeltaFIFO is a producer-consumer queue, where a Reflector is // DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls // intended to be the producer, and the consumer is whatever calls
@ -77,22 +88,22 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
// * You want to process every object change (delta) at most once. // * You want to process every object change (delta) at most once.
// * When you process an object, you want to see everything // * When you process an object, you want to see everything
// that's happened to it since you last processed it. // that's happened to it since you last processed it.
// * You want to process the deletion of objects. // * You want to process the deletion of some of the objects.
// * You might want to periodically reprocess objects. // * You might want to periodically reprocess objects.
// //
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it // interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas. // will always return an object of type Deltas.
// //
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
// question are called "known objects" and this set of objects
// modifies the behavior of the Delete, Replace, and Resync methods
// (each in a different way).
//
// A note on threading: If you call Pop() in parallel from multiple // A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly // threads, you could end up with multiple threads processing slightly
// different versions of the same object. // different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
type DeltaFIFO struct { type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'. // lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex lock sync.RWMutex
@ -114,9 +125,8 @@ type DeltaFIFO struct {
// insertion and retrieval, and should be deterministic. // insertion and retrieval, and should be deterministic.
keyFunc KeyFunc keyFunc KeyFunc
// knownObjects list keys that are "known", for the // knownObjects list keys that are "known" --- affecting Delete(),
// purpose of figuring out which items have been deleted // Replace(), and Resync()
// when Replace() or Delete() is called.
knownObjects KeyListerGetter knownObjects KeyListerGetter
// Indication the queue is closed. // Indication the queue is closed.
@ -185,9 +195,11 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
return f.queueActionLocked(Updated, obj) return f.queueActionLocked(Updated, obj)
} }
// Delete is just like Add, but makes an Deleted Delta. If the item does not // Delete is just like Add, but makes a Deleted Delta. If the given
// already exist, it will be ignored. (It may have already been deleted by a // object does not already exist, it will be ignored. (It may have
// Replace (re-list), for example. // already been deleted by a Replace (re-list), for example.) In this
// method `f.knownObjects`, if not nil, provides (via GetByKey)
// _additional_ objects that are considered to already exist.
func (f *DeltaFIFO) Delete(obj interface{}) error { func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj) id, err := f.KeyOf(obj)
if err != nil { if err != nil {
@ -313,6 +325,9 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
f.items[id] = newDeltas f.items[id] = newDeltas
f.cond.Broadcast() f.cond.Broadcast()
} else { } else {
// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// But if somehow it ever does return an empty list, then
// We need to remove this from our map (extra items in the queue are // We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map). // ignored if they are not in the map).
delete(f.items, id) delete(f.items, id)
@ -430,10 +445,16 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
} }
} }
// Replace will delete the contents of 'f', using instead the given map. // Replace atomically does two things: (1) it adds the given objects
// 'f' takes ownership of the map, you should not reference the map again // using the Sync type of Delta and then (2) it does some deletions.
// after calling this function. f's queue is reset, too; upon return, it // In particular: for every pre-existing key K that is not the key of
// will contain the items in the map, in no particular order. // an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K. If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K. Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
@ -507,7 +528,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
return nil return nil
} }
// Resync will send a sync event for each item // Resync adds, with a Sync type of Delta, every object listed by
// `f.knownObjects` whose key is not already queued for processing.
// If `f.knownObjects` is `nil` then Resync does nothing.
func (f *DeltaFIFO) Resync() error { func (f *DeltaFIFO) Resync() error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()

View File

@ -28,11 +28,15 @@ func testPop(f *DeltaFIFO) testFifoObject {
return Pop(f).(Deltas).Newest().Object.(testFifoObject) return Pop(f).(Deltas).Newest().Object.(testFifoObject)
} }
// keyLookupFunc adapts a raw function to be a KeyLookup. // literalListerGetter is a KeyListerGetter that is based on a
type keyLookupFunc func() []testFifoObject // function that returns a slice of objects to list and get.
// The function must list the same objects every time.
type literalListerGetter func() []testFifoObject
var _ KeyListerGetter = literalListerGetter(nil)
// ListKeys just calls kl. // ListKeys just calls kl.
func (kl keyLookupFunc) ListKeys() []string { func (kl literalListerGetter) ListKeys() []string {
result := []string{} result := []string{}
for _, fifoObj := range kl() { for _, fifoObj := range kl() {
result = append(result, fifoObj.name) result = append(result, fifoObj.name)
@ -41,7 +45,7 @@ func (kl keyLookupFunc) ListKeys() []string {
} }
// GetByKey returns the key if it exists in the list returned by kl. // GetByKey returns the key if it exists in the list returned by kl.
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) { func (kl literalListerGetter) GetByKey(key string) (interface{}, bool, error) {
for _, v := range kl() { for _, v := range kl() {
if v.name == key { if v.name == key {
return v, true, nil return v, true, nil
@ -95,7 +99,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
oldObj := mkFifoObj("foo", 1) oldObj := mkFifoObj("foo", 1)
newObj := mkFifoObj("foo", 2) newObj := mkFifoObj("foo", 2)
f := NewDeltaFIFO(testFifoObjectKeyFunc, keyLookupFunc(func() []testFifoObject { f := NewDeltaFIFO(testFifoObjectKeyFunc, literalListerGetter(func() []testFifoObject {
return []testFifoObject{oldObj} return []testFifoObject{oldObj}
})) }))
@ -218,7 +222,7 @@ func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
func TestDeltaFIFO_enqueueingWithLister(t *testing.T) { func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,
keyLookupFunc(func() []testFifoObject { literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}), }),
) )
@ -268,7 +272,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,
keyLookupFunc(func() []testFifoObject { literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5)} return []testFifoObject{mkFifoObj("foo", 5)}
}), }),
) )
@ -287,7 +291,7 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,
keyLookupFunc(func() []testFifoObject { literalListerGetter(func() []testFifoObject {
return []testFifoObject{} return []testFifoObject{}
}), }),
) )
@ -304,9 +308,13 @@ func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
} }
func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
// We test with only one pre-existing object because there is no
// promise about how their deletes are ordered.
// Try it with a pre-existing Delete
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,
keyLookupFunc(func() []testFifoObject { literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}), }),
) )
@ -327,12 +335,59 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
t.Errorf("Expected %#v, got %#v", e, a) t.Errorf("Expected %#v, got %#v", e, a)
} }
} }
// Now try starting with an Add instead of a Delete
f = NewDeltaFIFO(
testFifoObjectKeyFunc,
literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
)
f.Add(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList = []Deltas{
{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
{{Sync, mkFifoObj("foo", 5)}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
// Now try starting without an explicit KeyListerGetter
f = NewDeltaFIFO(
testFifoObjectKeyFunc,
nil,
)
f.Add(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList = []Deltas{
{{Added, mkFifoObj("baz", 10)},
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
{{Sync, mkFifoObj("foo", 5)}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
} }
func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,
keyLookupFunc(func() []testFifoObject { literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5)} return []testFifoObject{mkFifoObj("foo", 5)}
}), }),
) )
@ -354,7 +409,7 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,
keyLookupFunc(func() []testFifoObject { literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}), }),
) )

View File

@ -194,9 +194,9 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
return nil return nil
} }
// Resync will touch all objects to put them into the processing queue // Resync is a no-op for one of these
func (c *ExpirationCache) Resync() error { func (c *ExpirationCache) Resync() error {
return c.cacheStorage.Resync() return nil
} }
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy // NewTTLStore creates and returns a ExpirationCache with a TTLPolicy

56
tools/cache/fifo.go vendored
View File

@ -24,7 +24,7 @@ import (
) )
// PopProcessFunc is passed to Pop() method of Queue interface. // PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the element popped from the queue. // It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error type PopProcessFunc func(interface{}) error
// ErrRequeue may be returned by a PopProcessFunc to safely requeue // ErrRequeue may be returned by a PopProcessFunc to safely requeue
@ -44,26 +44,38 @@ func (e ErrRequeue) Error() string {
return e.Err.Error() return e.Err.Error()
} }
// Queue is exactly like a Store, but has a Pop() method too. // Queue extends Store with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface { type Queue interface {
Store Store
// Pop blocks until it has something to process. // Pop blocks until there is at least one key to process or the
// It returns the object that was process and the result of processing. // Queue is closed. In the latter case Pop returns with an error.
// The PopProcessFunc may return an ErrRequeue{...} to indicate the item // In the former case Pop atomically picks one key to process,
// should be requeued before releasing the lock on the queue. // removes that (key, accumulator) association from the Store, and
// processes the accumulator. Pop returns the accumulator that
// was processed and the result of processing. The PopProcessFunc
// may return an ErrRequeue{inner} and in this case Pop will (a)
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
Pop(PopProcessFunc) (interface{}, error) Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent adds a value previously // AddIfNotPresent puts the given accumulator into the Queue (in
// returned by Pop back into the queue as long // association with the accumulator's key) if and only if that key
// as nothing else (presumably more recent) // is not already associated with a non-empty accumulator.
// has since been added.
AddIfNotPresent(interface{}) error AddIfNotPresent(interface{}) error
// HasSynced returns true if the first batch of items has been popped // HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, Update, or Delete;
// otherwise the first batch is empty.
HasSynced() bool HasSynced() bool
// Close queue // Close the queue
Close() Close()
} }
@ -79,11 +91,16 @@ func Pop(queue Queue) interface{} {
return result return result
} }
// FIFO receives adds and updates from a Reflector, and puts them in a queue for // FIFO is a Queue in which (a) each accumulator is simply the most
// FIFO order processing. If multiple adds/updates of a single item happen while // recently provided object and (b) the collection of keys to process
// an item is in the queue before it has been processed, it will only be // is a FIFO. The accumulators all start out empty, and deleting an
// processed once, and when it is processed, the most recent version will be // object from its accumulator empties the accumulator. The Resync
// processed. This can't be done with a channel. // operation is a no-op.
//
// Thus: if multiple adds/updates of a single object happen while that
// object's key is in the queue before it has been processed then it
// will only be processed once, and when it is processed the most
// recent version will be processed. This can't be done with a channel
// //
// FIFO solves this use case: // FIFO solves this use case:
// * You want to process every object (exactly) once. // * You want to process every object (exactly) once.
@ -94,7 +111,7 @@ func Pop(queue Queue) interface{} {
type FIFO struct { type FIFO struct {
lock sync.RWMutex lock sync.RWMutex
cond sync.Cond cond sync.Cond
// We depend on the property that items in the set are in the queue and vice versa. // We depend on the property that every key in `items` is also in `queue`
items map[string]interface{} items map[string]interface{}
queue []string queue []string
@ -326,7 +343,8 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
return nil return nil
} }
// Resync will touch all objects to put them into the processing queue // Resync will ensure that every object in the Store has its key in the queue.
// This should be a no-op, because that property is maintained by all operations.
func (f *FIFO) Resync() error { func (f *FIFO) Resync() error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()

15
tools/cache/index.go vendored
View File

@ -23,12 +23,15 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
) )
// Indexer is a storage interface that lets you list objects using multiple indexing functions. // Indexer extends Store with multiple indices and restricts each
// There are three kinds of strings here. // accumulator to simply hold the current object (and be empty after
// One is a storage key, as defined in the Store interface. // Delete).
// Another kind is a name of an index. //
// The third kind of string is an "indexed value", which is produced by an // There are three kinds of strings here:
// IndexFunc and can be a field value or any other string computed from the object. // 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
// can be a field value or any other string computed from the object.
type Indexer interface { type Indexer interface {
Store Store
// Index returns the stored objects whose set of indexed values // Index returns the stored objects whose set of indexed values

View File

@ -36,9 +36,12 @@ func init() {
mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR")) mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
} }
// MutationDetector is able to monitor if the object be modified outside. // MutationDetector is able to monitor objects for mutation within a limited window of time
type MutationDetector interface { type MutationDetector interface {
// AddObject adds the given object to the set being monitored for a while from now
AddObject(obj interface{}) AddObject(obj interface{})
// Run starts the monitoring and does not return until the monitoring is stopped.
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
} }

View File

@ -55,7 +55,10 @@ type Reflector struct {
// stringification of expectedType otherwise. It is for display // stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison. // only, and should not be used for parsing or comparison.
expectedTypeName string expectedTypeName string
// The type of object we expect to place in the store. // An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and
// `"kind"` must also be right.
expectedType reflect.Type expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured. // The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind expectedGVK *schema.GroupVersionKind
@ -63,10 +66,12 @@ type Reflector struct {
store Store store Store
// listerWatcher is used to perform lists and watches. // listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher listerWatcher ListerWatcher
// period controls timing between one watch ending and // period controls timing between an unsuccessful watch ending and
// the beginning of the next one. // the beginning of the next list.
period time.Duration period time.Duration
// The period at which ShouldResync is invoked
resyncPeriod time.Duration resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool ShouldResync func() bool
// clock allows tests to manipulate time // clock allows tests to manipulate time
clock clock.Clock clock clock.Clock
@ -98,12 +103,16 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
return indexer, reflector return indexer, reflector
} }
// NewReflector creates a new Reflector object which will keep the given store up to // NewReflector creates a new Reflector object which will keep the
// date with the server's contents for the given resource. Reflector promises to // given store up to date with the server's contents for the given
// only put things in the store that have the type of expectedType, unless expectedType // resource. Reflector promises to only put things in the store that
// is nil. If resyncPeriod is non-zero, then lists will be executed after every // have the type of expectedType, unless expectedType is nil. If
// resyncPeriod, so that you can use reflectors to periodically process everything as // resyncPeriod is non-zero, then the reflector will periodically
// well as incrementally processing the things that change. // consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
} }
@ -147,7 +156,8 @@ func (r *Reflector) setExpectedType(expectedType interface{}) {
// call chains to NewReflector, so they'd be low entropy names for reflectors // call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"client-go/tools/cache/"} var internalPackages = []string{"client-go/tools/cache/"}
// Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed. // Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) { func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)

View File

@ -144,7 +144,7 @@ type SharedInformer interface {
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore returns the informer's local cache as a Store. // GetStore returns the informer's local cache as a Store.
GetStore() Store GetStore() Store
// GetController gives back a synthetic interface that "votes" to start the informer // GetController is deprecated, it does nothing useful
GetController() Controller GetController() Controller
// Run starts and runs the shared informer, returning after it stops. // Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed. // The informer will be stopped when stopCh is closed.
@ -168,21 +168,21 @@ type SharedIndexInformer interface {
} }
// NewSharedInformer creates a new instance for the listwatcher. // NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{}) return NewSharedIndexInformer(lw, exampleObject, resyncPeriod, Indexers{})
} }
// NewSharedIndexInformer creates a new instance for the listwatcher. // NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{} realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{ sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock}, processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw, listerWatcher: lw,
objectType: objType, objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod, resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock, clock: realClock,
} }
return sharedIndexInformer return sharedIndexInformer
@ -237,6 +237,19 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
return true return true
} }
// `*sharedIndexInformer` implements SharedIndexInformer and has three
// main components. One is an indexed local cache, `indexer Indexer`.
// The second main component is a Controller that pulls
// objects/notifications using the ListerWatcher and pushes them into
// a DeltaFIFO --- whose knownObjects is the informer's local cache
// --- while concurrently Popping Deltas values from that fifo and
// processing them with `sharedIndexInformer::HandleDeltas`. Each
// invocation of HandleDeltas, which is done with the fifo's lock
// held, processes each Delta in turn. For each Delta this both
// updates the local cache and stuffs the relevant notification into
// the sharedProcessor. The third main component is that
// sharedProcessor, which is responsible for relaying those
// notifications to each of the informer's clients.
type sharedIndexInformer struct { type sharedIndexInformer struct {
indexer Indexer indexer Indexer
controller Controller controller Controller
@ -244,9 +257,13 @@ type sharedIndexInformer struct {
processor *sharedProcessor processor *sharedProcessor
cacheMutationDetector MutationDetector cacheMutationDetector MutationDetector
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher listerWatcher ListerWatcher
objectType runtime.Object
// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync. // shouldResync to check if any of our listeners need a resync.
@ -485,6 +502,12 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
return nil return nil
} }
// sharedProcessor has a collection of processorListener and can
// distribute a notification object to its listeners. There are two
// kinds of distribute operations. The sync distributions go to a
// subset of the listeners that (a) is recomputed in the occasional
// calls to shouldResync and (b) every listener is initially put in.
// The non-sync distributions go to every listener.
type sharedProcessor struct { type sharedProcessor struct {
listenersStarted bool listenersStarted bool
listenersLock sync.RWMutex listenersLock sync.RWMutex
@ -576,6 +599,17 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
} }
} }
// processorListener relays notifications from a sharedProcessor to
// one ResourceEventHandler --- using two goroutines, two unbuffered
// channels, and an unbounded ring buffer. The `add(notification)`
// function sends the given notification to `addCh`. One goroutine
// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
// using storage in the ring buffer while `nextCh` is not keeping up.
// Another goroutine runs `run()`, which receives notifications from
// `nextCh` and synchronously invokes the appropriate handler method.
//
// processorListener also keeps track of the adjusted requested resync
// period of the listener.
type processorListener struct { type processorListener struct {
nextCh chan interface{} nextCh chan interface{}
addCh chan interface{} addCh chan interface{}
@ -589,11 +623,22 @@ type processorListener struct {
// we should try to do something better. // we should try to do something better.
pendingNotifications buffer.RingGrowing pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer // requestedResyncPeriod is how frequently the listener wants a
// full resync from the shared informer, but modified by two
// adjustments. One is imposing a lower bound,
// `minimumResyncPeriod`. The other is another lower bound, the
// sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only
// in AddEventHandlerWithResyncPeriod invocations made after the
// sharedProcessor starts and (b) only if the informer does
// resyncs at all.
requestedResyncPeriod time.Duration requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This // resyncPeriod is the threshold that will be used in the logic
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the // for this listener. This value differs from
// informer's overall resync check period. // requestedResyncPeriod only when the sharedIndexInformer does
// not do resyncs, in which case the value here is zero. The
// actual time between resyncs depends on when the
// sharedProcessor's `shouldResync` function is invoked and when
// the sharedIndexInformer processes `Sync` type Delta objects.
resyncPeriod time.Duration resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync // nextResync is the earliest time the listener should get a full resync
nextResync time.Time nextResync time.Time
@ -652,7 +697,7 @@ func (p *processorListener) pop() {
func (p *processorListener) run() { func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification // this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // we will catch it, **the offending item will be skipped!**, and after a short delay (one minute)
// the next notification will be attempted. This is usually better than the alternative of never // the next notification will be attempted. This is usually better than the alternative of never
// delivering again. // delivering again.
stopCh := make(chan struct{}) stopCh := make(chan struct{})

46
tools/cache/store.go vendored
View File

@ -23,27 +23,50 @@ import (
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
) )
// Store is a generic object storage interface. Reflector knows how to watch a server // Store is a generic object storage and processing interface. A
// and update a store. A generic store is provided, which allows Reflector to be used // Store holds a map from string keys to accumulators, and has
// as a local caching system, and an LRU store, which allows Reflector to work like a // operations to add, update, and delete a given object to/from the
// queue of items yet to be processed. // accumulator currently associated with a given key. A Store also
// knows how to extract the key from a given object, so many operations
// are given only the object.
// //
// Store makes no assumptions about stored object identity; it is the responsibility // In the simplest Store implementations each accumulator is simply
// of a Store implementation to provide a mechanism to correctly key objects and to // the last given object, or empty after Delete, and thus the Store's
// define the contract for obtaining objects by some arbitrary key type. // behavior is simple storage.
//
// Reflector knows how to watch a server and update a Store. This
// package provides a variety of implementations of Store.
type Store interface { type Store interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error Delete(obj interface{}) error
// List returns a list of all the currently non-empty accumulators
List() []interface{} List() []interface{}
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string ListKeys() []string
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error) Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the // Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference // given list. Store takes ownership of the list, you should not reference
// it after calling this function. // it after calling this function.
Replace([]interface{}, string) error Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error Resync() error
} }
@ -106,9 +129,8 @@ func SplitMetaNamespaceKey(key string) (namespace, name string, err error) {
return "", "", fmt.Errorf("unexpected key format: %q", key) return "", "", fmt.Errorf("unexpected key format: %q", key)
} }
// cache responsibilities are limited to: // `*cache` implements Indexer in terms of a ThreadSafeStore and an
// 1. Computing keys for objects via keyFunc // associated KeyFunc.
// 2. Invoking methods of a ThreadSafeStorage interface
type cache struct { type cache struct {
// cacheStorage bears the burden of thread safety for the cache // cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore cacheStorage ThreadSafeStore
@ -222,9 +244,9 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error {
return nil return nil
} }
// Resync touches all items in the store to force processing // Resync is meaningless for one of these
func (c *cache) Resync() error { func (c *cache) Resync() error {
return c.cacheStorage.Resync() return nil
} }
// NewStore returns a Store implemented simply with a map and a lock. // NewStore returns a Store implemented simply with a map and a lock.

View File

@ -23,7 +23,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
) )
// ThreadSafeStore is an interface that allows concurrent access to a storage backend. // ThreadSafeStore is an interface that allows concurrent indexed
// access to a storage backend. It is like Indexer but does not
// (necessarily) know how to extract the Store key from a given
// object.
//
// TL;DR caveats: you must not modify anything returned by Get or List as it will break // TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe. // the indexing feature in addition to not being thread safe.
// //
@ -51,6 +55,7 @@ type ThreadSafeStore interface {
// AddIndexers adds more indexers to this store. If you call this after you already have data // AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined. // in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error AddIndexers(newIndexers Indexers) error
// Resync is a no-op and is deprecated
Resync() error Resync() error
} }
@ -131,8 +136,8 @@ func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion st
} }
} }
// Index returns a list of items that match on the index function // Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable // Index is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
@ -142,37 +147,37 @@ func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{},
return nil, fmt.Errorf("Index with name %s does not exist", indexName) return nil, fmt.Errorf("Index with name %s does not exist", indexName)
} }
indexKeys, err := indexFunc(obj) indexedValues, err := indexFunc(obj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
index := c.indices[indexName] index := c.indices[indexName]
var returnKeySet sets.String var storeKeySet sets.String
if len(indexKeys) == 1 { if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching. // In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here. // Optimize the most common path - deduping is not needed here.
returnKeySet = index[indexKeys[0]] storeKeySet = index[indexedValues[0]]
} else { } else {
// Need to de-dupe the return list. // Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen. // Since multiple keys are allowed, this can happen.
returnKeySet = sets.String{} storeKeySet = sets.String{}
for _, indexKey := range indexKeys { for _, indexedValue := range indexedValues {
for key := range index[indexKey] { for key := range index[indexedValue] {
returnKeySet.Insert(key) storeKeySet.Insert(key)
} }
} }
} }
list := make([]interface{}, 0, returnKeySet.Len()) list := make([]interface{}, 0, storeKeySet.Len())
for absoluteKey := range returnKeySet { for storeKey := range storeKeySet {
list = append(list, c.items[absoluteKey]) list = append(list, c.items[storeKey])
} }
return list, nil return list, nil
} }
// ByIndex returns a list of items that match an exact value on the index function // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) { func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
@ -183,7 +188,7 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
index := c.indices[indexName] index := c.indices[indexName]
set := index[indexKey] set := index[indexedValue]
list := make([]interface{}, 0, set.Len()) list := make([]interface{}, 0, set.Len())
for key := range set { for key := range set {
list = append(list, c.items[key]) list = append(list, c.items[key])
@ -192,9 +197,9 @@ func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, erro
return list, nil return list, nil
} }
// IndexKeys returns a list of keys that match on the index function. // IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
// IndexKeys is thread-safe so long as you treat all items as immutable. // IndexKeys is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) { func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
@ -205,7 +210,7 @@ func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error)
index := c.indices[indexName] index := c.indices[indexName]
set := index[indexKey] set := index[indexedValue]
return set.List(), nil return set.List(), nil
} }