From db5618cb1f9809b8655697b0fe722fc96c09eaee Mon Sep 17 00:00:00 2001 From: Mike Spreitzer Date: Mon, 6 Jan 2020 15:59:11 -0500 Subject: [PATCH] began turning attention to cache.Controller Kubernetes-commit: 72fa714d636dc2f614b251c78cd8297f7754b333 --- tools/cache/controller.go | 32 +++++++++++++++++++------------- tools/cache/reflector.go | 30 ++++++++++++++++++++---------- tools/cache/shared_informer.go | 22 ++++++++++++++-------- 3 files changed, 53 insertions(+), 31 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 27a1c52c..a89e5cd0 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -26,7 +26,16 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -// Config contains all the settings for a Controller. +// This file implements a low-level controller that is used in +// sharedIndexInformer, which is an implementation of +// SharedIndexInformer. Such informers, in turn, are key components +// in the high level controllers that form the backbone of the +// Kubernetes control plane. Look at those for examples, or the +// example in +// https://github.com/kubernetes/client-go/tree/master/examples/workqueue +// . + +// Config contains all the settings for one of these low-level controllers. type Config struct { // The queue for your objects - has to be a DeltaFIFO due to // assumptions in the implementation. Your Process() function @@ -36,19 +45,16 @@ type Config struct { // Something that can list and watch your objects. ListerWatcher - // Something that can process your objects. + // Something that can process a popped Deltas. Process ProcessFunc - // The type of your objects. + // ObjectType is an example object of the type this controller is + // expected to handle. Only the type needs to be right, except + // that when that is `unstructured.Unstructured` the object's + // `"apiVersion"` must also be right. ObjectType runtime.Object - // Reprocess everything at least this often. - // Note that if it takes longer for you to clear the queue than this - // period, you will end up processing items in the order determined - // by FIFO.Replace(). Currently, this is random. If this is a - // problem, we can change that replacement policy to append new - // things to the end of the queue instead of replacing the entire - // queue. + // FullResyncPeriod is the period at which ShouldResync is invoked. FullResyncPeriod time.Duration // ShouldResync, if specified, is invoked when the controller's reflector determines the next @@ -71,7 +77,7 @@ type ShouldResyncFunc func() bool // ProcessFunc processes a single object. type ProcessFunc func(obj interface{}) error -// Controller is a generic controller framework. +// `*controller` implements Controller type controller struct { config Config reflector *Reflector @@ -79,7 +85,7 @@ type controller struct { clock clock.Clock } -// Controller is a generic controller framework. +// Controller is a low-level controller used in sharedIndexInformer. type Controller interface { Run(stopCh <-chan struct{}) HasSynced() bool @@ -95,7 +101,7 @@ func New(c *Config) Controller { return ctlr } -// Run begins processing items, and will continue until a value is sent down stopCh. +// Run begins processing items, and will continue until a value is sent down stopCh or it is closed. // It's an error to call Run more than once. // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) { diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index fc785c2c..4bbd4f59 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -55,7 +55,10 @@ type Reflector struct { // stringification of expectedType otherwise. It is for display // only, and should not be used for parsing or comparison. expectedTypeName string - // The type of object we expect to place in the store. + // An example object of the type we expect to place in the store. + // Only the type needs to be right, except that when that is + // `unstructured.Unstructured` the object's `"apiVersion"` must + // also be right. expectedType reflect.Type // The GVK of the object we expect to place in the store if unstructured. expectedGVK *schema.GroupVersionKind @@ -63,10 +66,12 @@ type Reflector struct { store Store // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher - // period controls timing between one watch ending and - // the beginning of the next one. + // period controls timing between an unsuccessful watch ending and + // the beginning of the next list. period time.Duration + // The period at which ShouldResync is invoked resyncPeriod time.Duration + // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked ShouldResync func() bool // clock allows tests to manipulate time clock clock.Clock @@ -98,12 +103,16 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa return indexer, reflector } -// NewReflector creates a new Reflector object which will keep the given store up to -// date with the server's contents for the given resource. Reflector promises to -// only put things in the store that have the type of expectedType, unless expectedType -// is nil. If resyncPeriod is non-zero, then lists will be executed after every -// resyncPeriod, so that you can use reflectors to periodically process everything as -// well as incrementally processing the things that change. +// NewReflector creates a new Reflector object which will keep the +// given store up to date with the server's contents for the given +// resource. Reflector promises to only put things in the store that +// have the type of expectedType, unless expectedType is nil. If +// resyncPeriod is non-zero, then the reflector will periodically +// consult its ShouldResync function to determine whether to invoke +// the Store's Resync operation; `ShouldResync==nil` means always +// "yes". This enables you to use reflectors to periodically process +// everything as well as incrementally processing the things that +// change. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) } @@ -147,7 +156,8 @@ func (r *Reflector) setExpectedType(expectedType interface{}) { // call chains to NewReflector, so they'd be low entropy names for reflectors var internalPackages = []string{"client-go/tools/cache/"} -// Run starts a watch and handles watch events. Will restart the watch if it is closed. +// Run repeatedly uses the reflector's ListAndWatch to fetch all the +// objects and subsequent deltas. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 18b16589..ba7a00ad 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -168,21 +168,21 @@ type SharedIndexInformer interface { } // NewSharedInformer creates a new instance for the listwatcher. -func NewSharedInformer(lw ListerWatcher, objOfWatchedType runtime.Object, resyncPeriod time.Duration) SharedInformer { - return NewSharedIndexInformer(lw, objOfWatchedType, resyncPeriod, Indexers{}) +func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, resyncPeriod time.Duration) SharedInformer { + return NewSharedIndexInformer(lw, exampleObject, resyncPeriod, Indexers{}) } // NewSharedIndexInformer creates a new instance for the listwatcher. -func NewSharedIndexInformer(lw ListerWatcher, objOfWatchedType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { +func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, - objectType: objOfWatchedType, + objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, - cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objOfWatchedType)), + cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer @@ -245,6 +245,11 @@ type sharedIndexInformer struct { cacheMutationDetector MutationDetector listerWatcher ListerWatcher + + // objectType is an example object of the type this informer is + // expected to handle. Only the type needs to be right, except + // that when that is `unstructured.Unstructured` the object's + // `"apiVersion"` must also be right. objectType runtime.Object // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call @@ -609,9 +614,10 @@ type processorListener struct { // full resync from the shared informer, but modified by two // adjustments. One is imposing a lower bound, // `minimumResyncPeriod`. The other is another lower bound, the - // sharedProcessor's `resyncCheckPeriod`, that is imposed in - // AddEventHandlerWithResyncPeriod invocations made after the - // sharedProcessor starts. + // sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only + // in AddEventHandlerWithResyncPeriod invocations made after the + // sharedProcessor starts and (b) only if the informer does + // resyncs at all. requestedResyncPeriod time.Duration // resyncPeriod is the threshold that will be used in the logic // for this listener. This value differs from