Reflector: support logging Unstructured type

Add an annotation that can be added to the exampleType passed to
NewReflector to indicate the expected type for the Reflector. This is
useful for types such as unstuctured.Unstructured, which, when used with
a dynamic informer, do not have their TypeMeta filled in.

Signed-off-by: Andy Goldstein <andy.goldstein@redhat.com>
This commit is contained in:
Andy Goldstein 2022-08-17 15:49:26 -04:00
parent 3e26e104bd
commit 474fc8c523
5 changed files with 228 additions and 97 deletions

View File

@ -120,7 +120,7 @@ func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{})
func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer { func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
return &dynamicInformer{ return &dynamicInformer{
gvr: gvr, gvr: gvr,
informer: cache.NewSharedIndexInformer( informer: cache.NewSharedIndexInformerWithOptions(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil { if tweakListOptions != nil {
@ -136,8 +136,11 @@ func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersio
}, },
}, },
&unstructured.Unstructured{}, &unstructured.Unstructured{},
resyncPeriod, cache.SharedIndexInformerOptions{
indexers, ResyncPeriod: resyncPeriod,
Indexers: indexers,
ObjectDescription: gvr.String(),
},
), ),
} }
} }

View File

@ -50,11 +50,12 @@ type Config struct {
Process ProcessFunc Process ProcessFunc
// ObjectType is an example object of the type this controller is // ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except // expected to handle.
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
ObjectType runtime.Object ObjectType runtime.Object
// ObjectDescription is the description to use when logging type-specific information about this controller.
ObjectDescription string
// FullResyncPeriod is the period at which ShouldResync is considered. // FullResyncPeriod is the period at which ShouldResync is considered.
FullResyncPeriod time.Duration FullResyncPeriod time.Duration
@ -131,11 +132,14 @@ func (c *controller) Run(stopCh <-chan struct{}) {
<-stopCh <-stopCh
c.config.Queue.Close() c.config.Queue.Close()
}() }()
r := NewReflector( r := NewReflectorWithOptions(
c.config.ListerWatcher, c.config.ListerWatcher,
c.config.ObjectType, c.config.ObjectType,
c.config.Queue, c.config.Queue,
c.config.FullResyncPeriod, ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
TypeDescription: c.config.ObjectDescription,
},
) )
r.ShouldResync = c.config.ShouldResync r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize r.WatchListPageSize = c.config.WatchListPageSize

View File

@ -54,7 +54,7 @@ type Reflector struct {
// will be the stringification of expectedGVK if provided, and the // will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display // stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison. // only, and should not be used for parsing or comparison.
expectedTypeName string typeDescription string
// An example object of the type we expect to place in the store. // An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is // Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and // `unstructured.Unstructured` the object's `"apiVersion"` and
@ -131,13 +131,13 @@ func DefaultWatchErrorHandler(r *Reflector, err error) {
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV. // has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object. // So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case err == io.EOF: case err == io.EOF:
// watch closed normally // watch closed normally
case err == io.ErrUnexpectedEOF: case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
default: default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
} }
} }
@ -155,7 +155,37 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
return indexer, reflector return indexer, reflector
} }
// NewReflector creates a new Reflector object which will keep the // NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
// that is outside this package. See NewReflectorWithOptions for further information.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}
// NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
// information.
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
}
// ReflectorOptions configures a Reflector.
type ReflectorOptions struct {
// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
// in the call stack that is outside this package.
Name string
// TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
// using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
// "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
// are set, the type description is the string encoding of those. Otherwise, the type description is set to the
// go type of expectedType..
TypeDescription string
// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
// (do not resync).
ResyncPeriod time.Duration
}
// NewReflectorWithOptions creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given // given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that // resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If // have the type of expectedType, unless expectedType is nil. If
@ -165,49 +195,70 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
// "yes". This enables you to use reflectors to periodically process // "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that // everything as well as incrementally processing the things that
// change. // change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{} realClock := &clock.RealClock{}
r := &Reflector{ r := &Reflector{
name: name, name: options.Name,
listerWatcher: lw, resyncPeriod: options.ResyncPeriod,
store: store, typeDescription: options.TypeDescription,
listerWatcher: lw,
store: store,
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock, clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
expectedType: reflect.TypeOf(expectedType),
} }
r.setExpectedType(expectedType)
if r.name == "" {
r.name = naming.GetNameFromCallsite(internalPackages...)
}
r.setTypeDescription(expectedType)
r.setExpectedGVK(expectedType)
return r return r
} }
func (r *Reflector) setExpectedType(expectedType interface{}) { func (r *Reflector) setTypeDescription(expectedType interface{}) {
r.expectedType = reflect.TypeOf(expectedType) if r.typeDescription != "" {
if r.expectedType == nil {
r.expectedTypeName = defaultExpectedTypeName
return return
} }
r.expectedTypeName = r.expectedType.String() if expectedType == nil {
r.typeDescription = defaultExpectedTypeName
if obj, ok := expectedType.(*unstructured.Unstructured); ok { } else {
// Use gvk to check that watch event objects are of the desired type. r.typeDescription = reflect.TypeOf(expectedType).String()
gvk := obj.GroupVersionKind()
if gvk.Empty() {
klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
return
}
r.expectedGVK = &gvk
r.expectedTypeName = gvk.String()
} }
obj, ok := expectedType.(*unstructured.Unstructured)
if !ok {
return
}
gvk := obj.GroupVersionKind()
if gvk.Empty() {
return
}
r.typeDescription = gvk.String()
}
func (r *Reflector) setExpectedGVK(expectedType interface{}) {
obj, ok := expectedType.(*unstructured.Unstructured)
if !ok {
return
}
gvk := obj.GroupVersionKind()
if gvk.Empty() {
return
}
r.expectedGVK = &gvk
} }
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
@ -218,13 +269,13 @@ var internalPackages = []string{"client-go/tools/cache/"}
// objects and subsequent deltas. // objects and subsequent deltas.
// Run will exit when stopCh is closed. // Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) { func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
wait.BackoffUntil(func() { wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err) r.watchErrorHandler(r, err)
} }
}, r.backoffManager, true, stopCh) }, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
} }
var ( var (
@ -254,7 +305,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// and then use the resource version to watch. // and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch. // It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
err := r.list(stopCh) err := r.list(stopCh)
if err != nil { if err != nil {
@ -326,7 +377,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
return err return err
} }
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
retry.After(err) retry.After(err)
if err != nil { if err != nil {
if err != errorStopRequested { if err != errorStopRequested {
@ -335,16 +386,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV. // has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object. // So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case apierrors.IsTooManyRequests(err): case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
<-r.initConnBackoffManager.Backoff().C() <-r.initConnBackoffManager.Backoff().C()
continue continue
case apierrors.IsInternalError(err) && retry.ShouldRetry(): case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err) klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
continue continue
default: default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
} }
} }
return nil return nil
@ -421,8 +472,8 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
} }
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err}) initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil { if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err) klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err) return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
} }
// We check if the list was paginated and if so set the paginatedResult based on that. // We check if the list was paginated and if so set the paginatedResult based on that.

View File

@ -138,7 +138,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
go func() { go func() {
fw.Stop() fw.Stop()
}() }()
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop)
if err == nil { if err == nil {
t.Errorf("unexpected non-error") t.Errorf("unexpected non-error")
} }
@ -157,7 +157,7 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
fw.Stop() fw.Stop()
}() }()
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop)
if err != nil { if err != nil {
t.Errorf("unexpected error %v", err) t.Errorf("unexpected error %v", err)
} }
@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
fw := watch.NewFake() fw := watch.NewFake()
stopWatch := make(chan struct{}, 1) stopWatch := make(chan struct{}, 1)
stopWatch <- struct{}{} stopWatch <- struct{}{}
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch)
if err != errorStopRequested { if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err) t.Errorf("expected stop error, got %q", err)
} }
@ -979,7 +979,7 @@ func TestReflectorFullListIfTooLarge(t *testing.T) {
} }
} }
func TestReflectorSetExpectedType(t *testing.T) { func TestReflectorSetTypeDescription(t *testing.T) {
obj := &unstructured.Unstructured{} obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{ gvk := schema.GroupVersionKind{
Group: "mygroup", Group: "mygroup",
@ -987,42 +987,86 @@ func TestReflectorSetExpectedType(t *testing.T) {
Kind: "MyKind", Kind: "MyKind",
} }
obj.SetGroupVersionKind(gvk) obj.SetGroupVersionKind(gvk)
testCases := map[string]struct { testCases := map[string]struct {
inputType interface{} inputType interface{}
expectedTypeName string customDescription string
expectedType reflect.Type expectedTypeDescription string
expectedGVK *schema.GroupVersionKind
}{ }{
"Nil type": { "Nil type": {
expectedTypeName: defaultExpectedTypeName, expectedTypeDescription: defaultExpectedTypeName,
}, },
"Normal type": { "Normal type": {
inputType: &v1.Pod{}, inputType: &v1.Pod{},
expectedTypeName: "*v1.Pod", expectedTypeDescription: "*v1.Pod",
expectedType: reflect.TypeOf(&v1.Pod{}), },
"Normal type with custom description": {
inputType: &v1.Pod{},
customDescription: "foo",
expectedTypeDescription: "foo",
}, },
"Unstructured type without GVK": { "Unstructured type without GVK": {
inputType: &unstructured.Unstructured{}, inputType: &unstructured.Unstructured{},
expectedTypeName: "*unstructured.Unstructured", expectedTypeDescription: "*unstructured.Unstructured",
expectedType: reflect.TypeOf(&unstructured.Unstructured{}), },
"Unstructured type without GVK, with custom description": {
inputType: &unstructured.Unstructured{},
customDescription: "foo",
expectedTypeDescription: "foo",
}, },
"Unstructured type with GVK": { "Unstructured type with GVK": {
inputType: obj, inputType: obj,
expectedTypeName: gvk.String(), expectedTypeDescription: gvk.String(),
expectedType: reflect.TypeOf(&unstructured.Unstructured{}), },
expectedGVK: &gvk, "Unstructured type with GVK, with custom type description": {
inputType: obj,
customDescription: "foo",
expectedTypeDescription: "foo",
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
r := &Reflector{
typeDescription: tc.customDescription,
}
r.setTypeDescription(tc.inputType)
if tc.expectedTypeDescription != r.typeDescription {
t.Fatalf("Expected typeDescription %v, got %v", tc.expectedTypeDescription, r.typeDescription)
}
})
}
}
func TestReflectorSetExpectedGVK(t *testing.T) {
obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{
Group: "mygroup",
Version: "v1",
Kind: "MyKind",
}
obj.SetGroupVersionKind(gvk)
testCases := map[string]struct {
inputType interface{}
expectedGVK *schema.GroupVersionKind
}{
"Nil type": {},
"Some non Unstructured type": {
inputType: &v1.Pod{},
},
"Unstructured type without GVK": {
inputType: &unstructured.Unstructured{},
},
"Unstructured type with GVK": {
inputType: obj,
expectedGVK: &gvk,
}, },
} }
for testName, tc := range testCases { for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) { t.Run(testName, func(t *testing.T) {
r := &Reflector{} r := &Reflector{}
r.setExpectedType(tc.inputType) r.setExpectedGVK(tc.inputType)
if tc.expectedType != r.expectedType {
t.Fatalf("Expected expectedType %v, got %v", tc.expectedType, r.expectedType)
}
if tc.expectedTypeName != r.expectedTypeName {
t.Fatalf("Expected expectedTypeName %v, got %v", tc.expectedTypeName, r.expectedTypeName)
}
gvkNotEqual := (tc.expectedGVK == nil) != (r.expectedGVK == nil) gvkNotEqual := (tc.expectedGVK == nil) != (r.expectedGVK == nil)
if tc.expectedGVK != nil && r.expectedGVK != nil { if tc.expectedGVK != nil && r.expectedGVK != nil {
gvkNotEqual = *tc.expectedGVK != *r.expectedGVK gvkNotEqual = *tc.expectedGVK != *r.expectedGVK

View File

@ -223,14 +223,26 @@ type SharedIndexInformer interface {
GetIndexer() Indexer GetIndexer() Indexer
} }
// NewSharedInformer creates a new instance for the listwatcher. // NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer { func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{}) return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
} }
// NewSharedIndexInformer creates a new instance for the listwatcher. // NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See
// The created informer will not do resyncs if the given // NewSharedIndexInformerWithOptions for full details.
// defaultEventHandlerResyncPeriod is zero. Otherwise: for each func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
return NewSharedIndexInformerWithOptions(
lw,
exampleObject,
SharedIndexInformerOptions{
ResyncPeriod: defaultEventHandlerResyncPeriod,
Indexers: indexers,
},
)
}
// NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher.
// The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each
// handler that with a non-zero requested resync period, whether added // handler that with a non-zero requested resync period, whether added
// before or after the informer starts, the nominal resync period is // before or after the informer starts, the nominal resync period is
// the requested resync period rounded up to a multiple of the // the requested resync period rounded up to a multiple of the
@ -238,21 +250,36 @@ func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEv
// checking period is established when the informer starts running, // checking period is established when the informer starts running,
// and is the maximum of (a) the minimum of the resync periods // and is the maximum of (a) the minimum of the resync periods
// requested before the informer starts and the // requested before the informer starts and the
// defaultEventHandlerResyncPeriod given here and (b) the constant // options.ResyncPeriod given here and (b) the constant
// `minimumResyncPeriod` defined in this file. // `minimumResyncPeriod` defined in this file.
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
realClock := &clock.RealClock{} realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
processor: &sharedProcessor{clock: realClock}, processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw, listerWatcher: lw,
objectType: exampleObject, objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod, objectDescription: options.ObjectDescription,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, resyncCheckPeriod: options.ResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), defaultEventHandlerResyncPeriod: options.ResyncPeriod,
clock: realClock, clock: realClock,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
} }
return sharedIndexInformer }
// SharedIndexInformerOptions configures a sharedIndexInformer.
type SharedIndexInformerOptions struct {
// ResyncPeriod is the default event handler resync period and resync check
// period. If unset/unspecified, these are defaulted to 0 (do not resync).
ResyncPeriod time.Duration
// Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
Indexers Indexers
// ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
// underlying Reflector's type description.
ObjectDescription string
} }
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
@ -326,12 +353,13 @@ type sharedIndexInformer struct {
listerWatcher ListerWatcher listerWatcher ListerWatcher
// objectType is an example object of the type this informer is // objectType is an example object of the type this informer is expected to handle. If set, an event
// expected to handle. Only the type needs to be right, except // with an object with a mismatching type is dropped instead of being delivered to listeners.
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
objectType runtime.Object objectType runtime.Object
// objectDescription is the description of this informer's objects. This typically defaults to
objectDescription string
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync. // shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration resyncCheckPeriod time.Duration
@ -425,12 +453,13 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
}) })
cfg := &Config{ cfg := &Config{
Queue: fifo, Queue: fifo,
ListerWatcher: s.listerWatcher, ListerWatcher: s.listerWatcher,
ObjectType: s.objectType, ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod, ObjectDescription: s.objectDescription,
RetryOnError: false, FullResyncPeriod: s.resyncCheckPeriod,
ShouldResync: s.processor.shouldResync, RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas, Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler, WatchErrorHandler: s.watchErrorHandler,