began turning attention to cache.Controller

Kubernetes-commit: 72fa714d636dc2f614b251c78cd8297f7754b333
This commit is contained in:
Mike Spreitzer 2020-01-06 15:59:11 -05:00 committed by Kubernetes Publisher
parent 261b8a00e4
commit db5618cb1f
3 changed files with 53 additions and 31 deletions

View File

@ -26,7 +26,16 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
) )
// Config contains all the settings for a Controller. // This file implements a low-level controller that is used in
// sharedIndexInformer, which is an implementation of
// SharedIndexInformer. Such informers, in turn, are key components
// in the high level controllers that form the backbone of the
// Kubernetes control plane. Look at those for examples, or the
// example in
// https://github.com/kubernetes/client-go/tree/master/examples/workqueue
// .
// Config contains all the settings for one of these low-level controllers.
type Config struct { type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to // The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function // assumptions in the implementation. Your Process() function
@ -36,19 +45,16 @@ type Config struct {
// Something that can list and watch your objects. // Something that can list and watch your objects.
ListerWatcher ListerWatcher
// Something that can process your objects. // Something that can process a popped Deltas.
Process ProcessFunc Process ProcessFunc
// The type of your objects. // ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` must also be right.
ObjectType runtime.Object ObjectType runtime.Object
// Reprocess everything at least this often. // FullResyncPeriod is the period at which ShouldResync is invoked.
// Note that if it takes longer for you to clear the queue than this
// period, you will end up processing items in the order determined
// by FIFO.Replace(). Currently, this is random. If this is a
// problem, we can change that replacement policy to append new
// things to the end of the queue instead of replacing the entire
// queue.
FullResyncPeriod time.Duration FullResyncPeriod time.Duration
// ShouldResync, if specified, is invoked when the controller's reflector determines the next // ShouldResync, if specified, is invoked when the controller's reflector determines the next
@ -71,7 +77,7 @@ type ShouldResyncFunc func() bool
// ProcessFunc processes a single object. // ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error type ProcessFunc func(obj interface{}) error
// Controller is a generic controller framework. // `*controller` implements Controller
type controller struct { type controller struct {
config Config config Config
reflector *Reflector reflector *Reflector
@ -79,7 +85,7 @@ type controller struct {
clock clock.Clock clock clock.Clock
} }
// Controller is a generic controller framework. // Controller is a low-level controller used in sharedIndexInformer.
type Controller interface { type Controller interface {
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
HasSynced() bool HasSynced() bool
@ -95,7 +101,7 @@ func New(c *Config) Controller {
return ctlr return ctlr
} }
// Run begins processing items, and will continue until a value is sent down stopCh. // Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once. // It's an error to call Run more than once.
// Run blocks; call via go. // Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) { func (c *controller) Run(stopCh <-chan struct{}) {

View File

@ -55,7 +55,10 @@ type Reflector struct {
// stringification of expectedType otherwise. It is for display // stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison. // only, and should not be used for parsing or comparison.
expectedTypeName string expectedTypeName string
// The type of object we expect to place in the store. // An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` must
// also be right.
expectedType reflect.Type expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured. // The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind expectedGVK *schema.GroupVersionKind
@ -63,10 +66,12 @@ type Reflector struct {
store Store store Store
// listerWatcher is used to perform lists and watches. // listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher listerWatcher ListerWatcher
// period controls timing between one watch ending and // period controls timing between an unsuccessful watch ending and
// the beginning of the next one. // the beginning of the next list.
period time.Duration period time.Duration
// The period at which ShouldResync is invoked
resyncPeriod time.Duration resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
ShouldResync func() bool ShouldResync func() bool
// clock allows tests to manipulate time // clock allows tests to manipulate time
clock clock.Clock clock clock.Clock
@ -98,12 +103,16 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
return indexer, reflector return indexer, reflector
} }
// NewReflector creates a new Reflector object which will keep the given store up to // NewReflector creates a new Reflector object which will keep the
// date with the server's contents for the given resource. Reflector promises to // given store up to date with the server's contents for the given
// only put things in the store that have the type of expectedType, unless expectedType // resource. Reflector promises to only put things in the store that
// is nil. If resyncPeriod is non-zero, then lists will be executed after every // have the type of expectedType, unless expectedType is nil. If
// resyncPeriod, so that you can use reflectors to periodically process everything as // resyncPeriod is non-zero, then the reflector will periodically
// well as incrementally processing the things that change. // consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
} }
@ -147,7 +156,8 @@ func (r *Reflector) setExpectedType(expectedType interface{}) {
// call chains to NewReflector, so they'd be low entropy names for reflectors // call chains to NewReflector, so they'd be low entropy names for reflectors
var internalPackages = []string{"client-go/tools/cache/"} var internalPackages = []string{"client-go/tools/cache/"}
// Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed. // Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) { func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)

View File

@ -168,21 +168,21 @@ type SharedIndexInformer interface {
} }
// NewSharedInformer creates a new instance for the listwatcher. // NewSharedInformer creates a new instance for the listwatcher.
func NewSharedInformer(lw ListerWatcher, objOfWatchedType runtime.Object, resyncPeriod time.Duration) SharedInformer { func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objOfWatchedType, resyncPeriod, Indexers{}) return NewSharedIndexInformer(lw, exampleObject, resyncPeriod, Indexers{})
} }
// NewSharedIndexInformer creates a new instance for the listwatcher. // NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objOfWatchedType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{} realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{ sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock}, processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw, listerWatcher: lw,
objectType: objOfWatchedType, objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod, resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objOfWatchedType)), cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock, clock: realClock,
} }
return sharedIndexInformer return sharedIndexInformer
@ -245,6 +245,11 @@ type sharedIndexInformer struct {
cacheMutationDetector MutationDetector cacheMutationDetector MutationDetector
listerWatcher ListerWatcher listerWatcher ListerWatcher
// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` must also be right.
objectType runtime.Object objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
@ -609,9 +614,10 @@ type processorListener struct {
// full resync from the shared informer, but modified by two // full resync from the shared informer, but modified by two
// adjustments. One is imposing a lower bound, // adjustments. One is imposing a lower bound,
// `minimumResyncPeriod`. The other is another lower bound, the // `minimumResyncPeriod`. The other is another lower bound, the
// sharedProcessor's `resyncCheckPeriod`, that is imposed in // sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only
// AddEventHandlerWithResyncPeriod invocations made after the // in AddEventHandlerWithResyncPeriod invocations made after the
// sharedProcessor starts. // sharedProcessor starts and (b) only if the informer does
// resyncs at all.
requestedResyncPeriod time.Duration requestedResyncPeriod time.Duration
// resyncPeriod is the threshold that will be used in the logic // resyncPeriod is the threshold that will be used in the logic
// for this listener. This value differs from // for this listener. This value differs from