Started commenting processors

This commit is contained in:
Mike Spreitzer 2020-01-06 01:49:29 -05:00
parent 0eca8ae9cd
commit 5546d9f12f
5 changed files with 52 additions and 24 deletions

View File

@ -194,9 +194,9 @@ func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) er
return nil
}
// Resync will touch all objects to put them into the processing queue
// Resync is a no-op for one of these
func (c *ExpirationCache) Resync() error {
return c.cacheStorage.Resync()
return nil
}
// NewTTLStore creates and returns a ExpirationCache with a TTLPolicy

View File

@ -23,12 +23,15 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)
// Indexer is a storage interface that lets you list objects using multiple indexing functions.
// There are three kinds of strings here.
// One is a storage key, as defined in the Store interface.
// Another kind is a name of an index.
// The third kind of string is an "indexed value", which is produced by an
// IndexFunc and can be a field value or any other string computed from the object.
// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after
// Delete).
//
// There are three kinds of strings here:
// 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
// can be a field value or any other string computed from the object.
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values

View File

@ -475,6 +475,12 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
return nil
}
// sharedProcessor has a collection of processorListener and can
// distribute a notification object to its listeners. Each distribute
// operation is `sync` or not. The sync distributions go to a subset
// of the listeners that (a) is recomputed in the occasional calls to
// shouldResync and (b) every listener is initially put in. The
// non-sync distributions go to every listener.
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
@ -566,6 +572,17 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Durati
}
}
// processorListener relays notifications from a sharedProcessor to
// one ResourceEventHandler --- using two goroutines, two unbuffered
// channels, and an unbounded ring buffer. The `add(notification)`
// function sends the given notification to `addCh`. One goroutine
// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
// using storage in the ring buffer while `nextCh` is not keeping up.
// Another goroutine runs `run()`, which receives notifications from
// `nextCh` and synchronously invokes the appropriate handler method.
//
// processorListener also keeps track of the adjusted requested resync
// period of the listener.
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
@ -579,11 +596,17 @@ type processorListener struct {
// we should try to do something better.
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
// requestedResyncPeriod is how frequently the listener wants a
// full resync from the shared informer, but is bounded below by
// `minimumResyncPeriod` and the sharedProcessor's
// `resyncCheckPeriod`.
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
// resyncPeriod is the threshold that will be used in the logic
// for this listener. This value does not differ from
// requestedResyncPeriod. The actual time between resyncs depends
// on when the sharedProcessor's `shouldResync` function is
// invoked and when the sharedIndexInformer processes `Sync` type
// Delta objects.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
@ -642,7 +665,7 @@ func (p *processorListener) pop() {
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// we will catch it, **the offending item will be skipped!**, and after a short delay (one minute)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})

View File

@ -31,8 +31,8 @@ import (
// are given only the object.
//
// In the simplest Store implementations each accumulator is simply
// the last given object and thus the Store's behavior is simple
// storage.
// the last given object, or empty after Delete, and thus the Store's
// behavior is simple storage.
//
// Reflector knows how to watch a server and update a Store. This
// package provides a variety of implementations of Store.
@ -66,9 +66,7 @@ type Store interface {
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior. In general the idea is to tee up the
// current non-empty accumulators or their keys for
// reconsideration (whatever that means).
// additional behavior (e.g., DeltaFIFO).
Resync() error
}
@ -131,9 +129,8 @@ func SplitMetaNamespaceKey(key string) (namespace, name string, err error) {
return "", "", fmt.Errorf("unexpected key format: %q", key)
}
// cache responsibilities are limited to:
// 1. Computing keys for objects via keyFunc
// 2. Invoking methods of a ThreadSafeStorage interface
// `*cache` implements Indexer in terms of a ThreadSafeStore and an
// associated KeyFunc.
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
@ -247,9 +244,9 @@ func (c *cache) Replace(list []interface{}, resourceVersion string) error {
return nil
}
// Resync touches all items in the store to force processing
// Resync is meaningless for one of these
func (c *cache) Resync() error {
return c.cacheStorage.Resync()
return nil
}
// NewStore returns a Store implemented simply with a map and a lock.

View File

@ -23,7 +23,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)
// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
// ThreadSafeStore is an interface that allows concurrent indexed
// access to a storage backend. It is like Indexer but does not
// (necessarily) know how to extract the Store key from a given
// object.
//
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
@ -51,6 +55,7 @@ type ThreadSafeStore interface {
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
// Resync is a no-op and is deprecated
Resync() error
}