From 5546d9f12f266a082805ad3a2293fe04e0029c78 Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Mon, 6 Jan 2020 01:49:29 -0500 Subject: [PATCH] Started commenting processors --- .../client-go/tools/cache/expiration_cache.go | 4 +-- .../src/k8s.io/client-go/tools/cache/index.go | 15 +++++---- .../client-go/tools/cache/shared_informer.go | 33 ++++++++++++++++--- .../src/k8s.io/client-go/tools/cache/store.go | 17 ++++------ .../tools/cache/thread_safe_store.go | 7 +++- 5 files changed, 52 insertions(+), 24 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/expiration_cache.go b/staging/src/k8s.io/client-go/tools/cache/expiration_cache.go index 14ad492ecc8..e687593f61e 100644 --- a/staging/src/k8s.io/client-go/tools/cache/expiration_cache.go +++ b/staging/src/k8s.io/client-go/tools/cache/expiration_cache.go @@ -194,9 +194,9 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er return nil } -// Resync will touch all objects to put them into the processing queue +// Resync is a no-op for one of these func (c *ExpirationCache) Resync() error { - return c.cacheStorage.Resync() + return nil } // NewTTLStore creates and returns a ExpirationCache with a TTLPolicy diff --git a/staging/src/k8s.io/client-go/tools/cache/index.go b/staging/src/k8s.io/client-go/tools/cache/index.go index bbfb3b55f69..fa29e6a704b 100644 --- a/staging/src/k8s.io/client-go/tools/cache/index.go +++ b/staging/src/k8s.io/client-go/tools/cache/index.go @@ -23,12 +23,15 @@ import ( "k8s.io/apimachinery/pkg/util/sets" ) -// Indexer is a storage interface that lets you list objects using multiple indexing functions. -// There are three kinds of strings here. -// One is a storage key, as defined in the Store interface. -// Another kind is a name of an index. -// The third kind of string is an "indexed value", which is produced by an -// IndexFunc and can be a field value or any other string computed from the object. +// Indexer extends Store with multiple indices and restricts each +// accumulator to simply hold the current object (and be empty after +// Delete). +// +// There are three kinds of strings here: +// 1. a storage key, as defined in the Store interface, +// 2. a name of an index, and +// 3. an "indexed value", which is produced by an IndexFunc and +// can be a field value or any other string computed from the object. type Indexer interface { Store // Index returns the stored objects whose set of indexed values diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 6c08603cf3b..1a0531637e6 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -475,6 +475,12 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { return nil } +// sharedProcessor has a collection of processorListener and can +// distribute a notification object to its listeners. Each distribute +// operation is `sync` or not. The sync distributions go to a subset +// of the listeners that (a) is recomputed in the occasional calls to +// shouldResync and (b) every listener is initially put in. The +// non-sync distributions go to every listener. type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex @@ -566,6 +572,17 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati } } +// processorListener relays notifications from a sharedProcessor to +// one ResourceEventHandler --- using two goroutines, two unbuffered +// channels, and an unbounded ring buffer. The `add(notification)` +// function sends the given notification to `addCh`. One goroutine +// runs `pop()`, which pumps notifications from `addCh` to `nextCh` +// using storage in the ring buffer while `nextCh` is not keeping up. +// Another goroutine runs `run()`, which receives notifications from +// `nextCh` and synchronously invokes the appropriate handler method. +// +// processorListener also keeps track of the adjusted requested resync +// period of the listener. type processorListener struct { nextCh chan interface{} addCh chan interface{} @@ -579,11 +596,17 @@ type processorListener struct { // we should try to do something better. pendingNotifications buffer.RingGrowing - // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer + // requestedResyncPeriod is how frequently the listener wants a + // full resync from the shared informer, but is bounded below by + // `minimumResyncPeriod` and the sharedProcessor's + // `resyncCheckPeriod`. requestedResyncPeriod time.Duration - // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This - // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the - // informer's overall resync check period. + // resyncPeriod is the threshold that will be used in the logic + // for this listener. This value does not differ from + // requestedResyncPeriod. The actual time between resyncs depends + // on when the sharedProcessor's `shouldResync` function is + // invoked and when the sharedIndexInformer processes `Sync` type + // Delta objects. resyncPeriod time.Duration // nextResync is the earliest time the listener should get a full resync nextResync time.Time @@ -642,7 +665,7 @@ func (p *processorListener) pop() { func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification - // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) + // we will catch it, **the offending item will be skipped!**, and after a short delay (one minute) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) diff --git a/staging/src/k8s.io/client-go/tools/cache/store.go b/staging/src/k8s.io/client-go/tools/cache/store.go index eb71a94d97f..886e95d2deb 100644 --- a/staging/src/k8s.io/client-go/tools/cache/store.go +++ b/staging/src/k8s.io/client-go/tools/cache/store.go @@ -31,8 +31,8 @@ import ( // are given only the object. // // In the simplest Store implementations each accumulator is simply -// the last given object and thus the Store's behavior is simple -// storage. +// the last given object, or empty after Delete, and thus the Store's +// behavior is simple storage. // // Reflector knows how to watch a server and update a Store. This // package provides a variety of implementations of Store. @@ -66,9 +66,7 @@ type Store interface { // Resync is meaningless in the terms appearing here but has // meaning in some implementations that have non-trivial - // additional behavior. In general the idea is to tee up the - // current non-empty accumulators or their keys for - // reconsideration (whatever that means). + // additional behavior (e.g., DeltaFIFO). Resync() error } @@ -131,9 +129,8 @@ func SplitMetaNamespaceKey(key string) (namespace, name string, err error) { return "", "", fmt.Errorf("unexpected key format: %q", key) } -// cache responsibilities are limited to: -// 1. Computing keys for objects via keyFunc -// 2. Invoking methods of a ThreadSafeStorage interface +// `*cache` implements Indexer in terms of a ThreadSafeStore and an +// associated KeyFunc. type cache struct { // cacheStorage bears the burden of thread safety for the cache cacheStorage ThreadSafeStore @@ -247,9 +244,9 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error { return nil } -// Resync touches all items in the store to force processing +// Resync is meaningless for one of these func (c *cache) Resync() error { - return c.cacheStorage.Resync() + return nil } // NewStore returns a Store implemented simply with a map and a lock. diff --git a/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go b/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go index 775fe9a18fc..56251179b5f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go +++ b/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go @@ -23,7 +23,11 @@ import ( "k8s.io/apimachinery/pkg/util/sets" ) -// ThreadSafeStore is an interface that allows concurrent access to a storage backend. +// ThreadSafeStore is an interface that allows concurrent indexed +// access to a storage backend. It is like Indexer but does not +// (necessarily) know how to extract the Store key from a given +// object. +// // TL;DR caveats: you must not modify anything returned by Get or List as it will break // the indexing feature in addition to not being thread safe. // @@ -51,6 +55,7 @@ type ThreadSafeStore interface { // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error + // Resync is a no-op and is deprecated Resync() error }