Merge pull request #111898 from ncdc/reflector-unstructured-fixes

Reflector: support logging Unstructured type
This commit is contained in:
Kubernetes Prow Robot 2022-12-09 15:42:44 -08:00 committed by GitHub
commit 054274d77c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 212 additions and 101 deletions

View File

@ -120,7 +120,7 @@ func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{})
func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer { func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
return &dynamicInformer{ return &dynamicInformer{
gvr: gvr, gvr: gvr,
informer: cache.NewSharedIndexInformer( informer: cache.NewSharedIndexInformerWithOptions(
&cache.ListWatch{ &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil { if tweakListOptions != nil {
@ -136,8 +136,11 @@ func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersio
}, },
}, },
&unstructured.Unstructured{}, &unstructured.Unstructured{},
resyncPeriod, cache.SharedIndexInformerOptions{
indexers, ResyncPeriod: resyncPeriod,
Indexers: indexers,
ObjectDescription: gvr.String(),
},
), ),
} }
} }

View File

@ -50,11 +50,12 @@ type Config struct {
Process ProcessFunc Process ProcessFunc
// ObjectType is an example object of the type this controller is // ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except // expected to handle.
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
ObjectType runtime.Object ObjectType runtime.Object
// ObjectDescription is the description to use when logging type-specific information about this controller.
ObjectDescription string
// FullResyncPeriod is the period at which ShouldResync is considered. // FullResyncPeriod is the period at which ShouldResync is considered.
FullResyncPeriod time.Duration FullResyncPeriod time.Duration
@ -131,11 +132,14 @@ func (c *controller) Run(stopCh <-chan struct{}) {
<-stopCh <-stopCh
c.config.Queue.Close() c.config.Queue.Close()
}() }()
r := NewReflector( r := NewReflectorWithOptions(
c.config.ListerWatcher, c.config.ListerWatcher,
c.config.ObjectType, c.config.ObjectType,
c.config.Queue, c.config.Queue,
c.config.FullResyncPeriod, ReflectorOptions{
ResyncPeriod: c.config.FullResyncPeriod,
TypeDescription: c.config.ObjectDescription,
},
) )
r.ShouldResync = c.config.ShouldResync r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize r.WatchListPageSize = c.config.WatchListPageSize

View File

@ -54,7 +54,7 @@ type Reflector struct {
// will be the stringification of expectedGVK if provided, and the // will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display // stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison. // only, and should not be used for parsing or comparison.
expectedTypeName string typeDescription string
// An example object of the type we expect to place in the store. // An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is // Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and // `unstructured.Unstructured` the object's `"apiVersion"` and
@ -131,13 +131,13 @@ func DefaultWatchErrorHandler(r *Reflector, err error) {
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV. // has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object. // So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case err == io.EOF: case err == io.EOF:
// watch closed normally // watch closed normally
case err == io.ErrUnexpectedEOF: case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
default: default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
} }
} }
@ -155,7 +155,37 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
return indexer, reflector return indexer, reflector
} }
// NewReflector creates a new Reflector object which will keep the // NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
// that is outside this package. See NewReflectorWithOptions for further information.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}
// NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
// information.
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
}
// ReflectorOptions configures a Reflector.
type ReflectorOptions struct {
// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
// in the call stack that is outside this package.
Name string
// TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
// using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
// "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
// are set, the type description is the string encoding of those. Otherwise, the type description is set to the
// go type of expectedType..
TypeDescription string
// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
// (do not resync).
ResyncPeriod time.Duration
}
// NewReflectorWithOptions creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given // given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that // resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If // have the type of expectedType, unless expectedType is nil. If
@ -165,49 +195,71 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa
// "yes". This enables you to use reflectors to periodically process // "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that // everything as well as incrementally processing the things that
// change. // change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{} realClock := &clock.RealClock{}
r := &Reflector{ r := &Reflector{
name: name, name: options.Name,
listerWatcher: lw, resyncPeriod: options.ResyncPeriod,
store: store, typeDescription: options.TypeDescription,
listerWatcher: lw,
store: store,
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock, clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
expectedType: reflect.TypeOf(expectedType),
} }
r.setExpectedType(expectedType)
if r.name == "" {
r.name = naming.GetNameFromCallsite(internalPackages...)
}
if r.typeDescription == "" {
r.typeDescription = getTypeDescriptionFromObject(expectedType)
}
if r.expectedGVK == nil {
r.expectedGVK = getExpectedGVKFromObject(expectedType)
}
return r return r
} }
func (r *Reflector) setExpectedType(expectedType interface{}) { func getTypeDescriptionFromObject(expectedType interface{}) string {
r.expectedType = reflect.TypeOf(expectedType) if expectedType == nil {
if r.expectedType == nil { return defaultExpectedTypeName
r.expectedTypeName = defaultExpectedTypeName
return
} }
r.expectedTypeName = r.expectedType.String() reflectDescription := reflect.TypeOf(expectedType).String()
if obj, ok := expectedType.(*unstructured.Unstructured); ok { obj, ok := expectedType.(*unstructured.Unstructured)
// Use gvk to check that watch event objects are of the desired type. if !ok {
gvk := obj.GroupVersionKind() return reflectDescription
if gvk.Empty() {
klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
return
}
r.expectedGVK = &gvk
r.expectedTypeName = gvk.String()
} }
gvk := obj.GroupVersionKind()
if gvk.Empty() {
return reflectDescription
}
return gvk.String()
}
func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind {
obj, ok := expectedType.(*unstructured.Unstructured)
if !ok {
return nil
}
gvk := obj.GroupVersionKind()
if gvk.Empty() {
return nil
}
return &gvk
} }
// internalPackages are packages that ignored when creating a default reflector name. These packages are in the common // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
@ -218,13 +270,13 @@ var internalPackages = []string{"client-go/tools/cache/"}
// objects and subsequent deltas. // objects and subsequent deltas.
// Run will exit when stopCh is closed. // Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) { func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
wait.BackoffUntil(func() { wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil { if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err) r.watchErrorHandler(r, err)
} }
}, r.backoffManager, true, stopCh) }, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
} }
var ( var (
@ -254,7 +306,7 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// and then use the resource version to watch. // and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch. // It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
err := r.list(stopCh) err := r.list(stopCh)
if err != nil { if err != nil {
@ -326,7 +378,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
return err return err
} }
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
retry.After(err) retry.After(err)
if err != nil { if err != nil {
if err != errorStopRequested { if err != errorStopRequested {
@ -335,16 +387,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV. // has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object. // So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case apierrors.IsTooManyRequests(err): case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName) klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
<-r.initConnBackoffManager.Backoff().C() <-r.initConnBackoffManager.Backoff().C()
continue continue
case apierrors.IsInternalError(err) && retry.ShouldRetry(): case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.expectedTypeName, err) klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
continue continue
default: default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
} }
} }
return nil return nil
@ -421,8 +473,8 @@ func (r *Reflector) list(stopCh <-chan struct{}) error {
} }
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err}) initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil { if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.expectedTypeName, err) klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
return fmt.Errorf("failed to list %v: %w", r.expectedTypeName, err) return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
} }
// We check if the list was paginated and if so set the paginatedResult based on that. // We check if the list was paginated and if so set the paginatedResult based on that.

View File

@ -138,7 +138,7 @@ func TestReflectorWatchHandlerError(t *testing.T) {
go func() { go func() {
fw.Stop() fw.Stop()
}() }()
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop)
if err == nil { if err == nil {
t.Errorf("unexpected non-error") t.Errorf("unexpected non-error")
} }
@ -157,7 +157,7 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
fw.Stop() fw.Stop()
}() }()
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop) err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, wait.NeverStop)
if err != nil { if err != nil {
t.Errorf("unexpected error %v", err) t.Errorf("unexpected error %v", err)
} }
@ -205,7 +205,7 @@ func TestReflectorStopWatch(t *testing.T) {
fw := watch.NewFake() fw := watch.NewFake()
stopWatch := make(chan struct{}, 1) stopWatch := make(chan struct{}, 1)
stopWatch <- struct{}{} stopWatch <- struct{}{}
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.expectedTypeName, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch) err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc, stopWatch)
if err != errorStopRequested { if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err) t.Errorf("expected stop error, got %q", err)
} }
@ -979,7 +979,7 @@ func TestReflectorFullListIfTooLarge(t *testing.T) {
} }
} }
func TestReflectorSetExpectedType(t *testing.T) { func TestGetTypeDescriptionFromObject(t *testing.T) {
obj := &unstructured.Unstructured{} obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{ gvk := schema.GroupVersionKind{
Group: "mygroup", Group: "mygroup",
@ -987,48 +987,71 @@ func TestReflectorSetExpectedType(t *testing.T) {
Kind: "MyKind", Kind: "MyKind",
} }
obj.SetGroupVersionKind(gvk) obj.SetGroupVersionKind(gvk)
testCases := map[string]struct { testCases := map[string]struct {
inputType interface{} inputType interface{}
expectedTypeName string expectedTypeDescription string
expectedType reflect.Type
expectedGVK *schema.GroupVersionKind
}{ }{
"Nil type": { "Nil type": {
expectedTypeName: defaultExpectedTypeName, expectedTypeDescription: defaultExpectedTypeName,
}, },
"Normal type": { "Normal type": {
inputType: &v1.Pod{}, inputType: &v1.Pod{},
expectedTypeName: "*v1.Pod", expectedTypeDescription: "*v1.Pod",
expectedType: reflect.TypeOf(&v1.Pod{}),
}, },
"Unstructured type without GVK": { "Unstructured type without GVK": {
inputType: &unstructured.Unstructured{}, inputType: &unstructured.Unstructured{},
expectedTypeName: "*unstructured.Unstructured", expectedTypeDescription: "*unstructured.Unstructured",
expectedType: reflect.TypeOf(&unstructured.Unstructured{}),
}, },
"Unstructured type with GVK": { "Unstructured type with GVK": {
inputType: obj, inputType: obj,
expectedTypeName: gvk.String(), expectedTypeDescription: gvk.String(),
expectedType: reflect.TypeOf(&unstructured.Unstructured{}),
expectedGVK: &gvk,
}, },
} }
for testName, tc := range testCases { for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) { t.Run(testName, func(t *testing.T) {
r := &Reflector{} typeDescription := getTypeDescriptionFromObject(tc.inputType)
r.setExpectedType(tc.inputType) if tc.expectedTypeDescription != typeDescription {
if tc.expectedType != r.expectedType { t.Fatalf("Expected typeDescription %v, got %v", tc.expectedTypeDescription, typeDescription)
t.Fatalf("Expected expectedType %v, got %v", tc.expectedType, r.expectedType)
} }
if tc.expectedTypeName != r.expectedTypeName { })
t.Fatalf("Expected expectedTypeName %v, got %v", tc.expectedTypeName, r.expectedTypeName) }
} }
gvkNotEqual := (tc.expectedGVK == nil) != (r.expectedGVK == nil)
if tc.expectedGVK != nil && r.expectedGVK != nil { func TestGetExpectedGVKFromObject(t *testing.T) {
gvkNotEqual = *tc.expectedGVK != *r.expectedGVK obj := &unstructured.Unstructured{}
gvk := schema.GroupVersionKind{
Group: "mygroup",
Version: "v1",
Kind: "MyKind",
}
obj.SetGroupVersionKind(gvk)
testCases := map[string]struct {
inputType interface{}
expectedGVK *schema.GroupVersionKind
}{
"Nil type": {},
"Some non Unstructured type": {
inputType: &v1.Pod{},
},
"Unstructured type without GVK": {
inputType: &unstructured.Unstructured{},
},
"Unstructured type with GVK": {
inputType: obj,
expectedGVK: &gvk,
},
}
for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
expectedGVK := getExpectedGVKFromObject(tc.inputType)
gvkNotEqual := (tc.expectedGVK == nil) != (expectedGVK == nil)
if tc.expectedGVK != nil && expectedGVK != nil {
gvkNotEqual = *tc.expectedGVK != *expectedGVK
} }
if gvkNotEqual { if gvkNotEqual {
t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, r.expectedGVK) t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, expectedGVK)
} }
}) })
} }

View File

@ -223,14 +223,26 @@ type SharedIndexInformer interface {
GetIndexer() Indexer GetIndexer() Indexer
} }
// NewSharedInformer creates a new instance for the listwatcher. // NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.
func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer { func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{}) return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
} }
// NewSharedIndexInformer creates a new instance for the listwatcher. // NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See
// The created informer will not do resyncs if the given // NewSharedIndexInformerWithOptions for full details.
// defaultEventHandlerResyncPeriod is zero. Otherwise: for each func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
return NewSharedIndexInformerWithOptions(
lw,
exampleObject,
SharedIndexInformerOptions{
ResyncPeriod: defaultEventHandlerResyncPeriod,
Indexers: indexers,
},
)
}
// NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher.
// The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each
// handler that with a non-zero requested resync period, whether added // handler that with a non-zero requested resync period, whether added
// before or after the informer starts, the nominal resync period is // before or after the informer starts, the nominal resync period is
// the requested resync period rounded up to a multiple of the // the requested resync period rounded up to a multiple of the
@ -238,21 +250,36 @@ func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEv
// checking period is established when the informer starts running, // checking period is established when the informer starts running,
// and is the maximum of (a) the minimum of the resync periods // and is the maximum of (a) the minimum of the resync periods
// requested before the informer starts and the // requested before the informer starts and the
// defaultEventHandlerResyncPeriod given here and (b) the constant // options.ResyncPeriod given here and (b) the constant
// `minimumResyncPeriod` defined in this file. // `minimumResyncPeriod` defined in this file.
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
realClock := &clock.RealClock{} realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
processor: &sharedProcessor{clock: realClock}, processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw, listerWatcher: lw,
objectType: exampleObject, objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod, objectDescription: options.ObjectDescription,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, resyncCheckPeriod: options.ResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), defaultEventHandlerResyncPeriod: options.ResyncPeriod,
clock: realClock, clock: realClock,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
} }
return sharedIndexInformer }
// SharedIndexInformerOptions configures a sharedIndexInformer.
type SharedIndexInformerOptions struct {
// ResyncPeriod is the default event handler resync period and resync check
// period. If unset/unspecified, these are defaulted to 0 (do not resync).
ResyncPeriod time.Duration
// Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
Indexers Indexers
// ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
// underlying Reflector's type description.
ObjectDescription string
} }
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
@ -326,12 +353,13 @@ type sharedIndexInformer struct {
listerWatcher ListerWatcher listerWatcher ListerWatcher
// objectType is an example object of the type this informer is // objectType is an example object of the type this informer is expected to handle. If set, an event
// expected to handle. Only the type needs to be right, except // with an object with a mismatching type is dropped instead of being delivered to listeners.
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
objectType runtime.Object objectType runtime.Object
// objectDescription is the description of this informer's objects. This typically defaults to
objectDescription string
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync. // shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration resyncCheckPeriod time.Duration
@ -425,12 +453,13 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
}) })
cfg := &Config{ cfg := &Config{
Queue: fifo, Queue: fifo,
ListerWatcher: s.listerWatcher, ListerWatcher: s.listerWatcher,
ObjectType: s.objectType, ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod, ObjectDescription: s.objectDescription,
RetryOnError: false, FullResyncPeriod: s.resyncCheckPeriod,
ShouldResync: s.processor.shouldResync, RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas, Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler, WatchErrorHandler: s.watchErrorHandler,