diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index a889fdbc..be8694dd 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -459,29 +459,30 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed") return } - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: s.indexer, - EmitDeltaTypeReplaced: true, - Transformer: s.transform, - }) - - cfg := &Config{ - Queue: fifo, - ListerWatcher: s.listerWatcher, - ObjectType: s.objectType, - ObjectDescription: s.objectDescription, - FullResyncPeriod: s.resyncCheckPeriod, - RetryOnError: false, - ShouldResync: s.processor.shouldResync, - - Process: s.HandleDeltas, - WatchErrorHandler: s.watchErrorHandler, - } func() { s.startedLock.Lock() defer s.startedLock.Unlock() + fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: s.indexer, + EmitDeltaTypeReplaced: true, + Transformer: s.transform, + }) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: s.listerWatcher, + ObjectType: s.objectType, + ObjectDescription: s.objectDescription, + FullResyncPeriod: s.resyncCheckPeriod, + RetryOnError: false, + ShouldResync: s.processor.shouldResync, + + Process: s.HandleDeltas, + WatchErrorHandler: s.watchErrorHandler, + } + s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 30776b29..459f257f 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -393,6 +393,33 @@ func TestSharedInformerErrorHandling(t *testing.T) { close(stop) } +// TestSharedInformerStartRace is a regression test to ensure there is no race between +// Run and SetWatchErrorHandler, and Run and SetTransform. +func TestSharedInformerStartRace(t *testing.T) { + source := fcache.NewFakeControllerSource() + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + stop := make(chan struct{}) + go func() { + for { + select { + case <-stop: + return + default: + } + // Set dummy functions, just to test for race + informer.SetTransform(func(i interface{}) (interface{}, error) { + return i, nil + }) + informer.SetWatchErrorHandler(func(r *Reflector, err error) { + }) + } + }() + + go informer.Run(stop) + + close(stop) +} + func TestSharedInformerTransformer(t *testing.T) { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource()