From 702d7378b67a6afe9f6c15dc241f67ffb9907e29 Mon Sep 17 00:00:00 2001 From: John Howard Date: Mon, 8 May 2023 10:11:54 -0700 Subject: [PATCH] informer: fix race against Run and SetTransform/SetWatchErrorHandler `SetWatchErrorHandler` claims it will fail if Run() has already started. But if they are called concurrently, it will actually trigger a data race. With this PR: ``` 62702 runs so far, 0 failures (100.00% pass rate). 59.152682ms avg, 189.068387ms max, 26.623785ms min ``` Without this PR: ``` 5012 runs so far, 38 failures (99.25% pass rate). 58.675502ms avg, 186.018084ms max, 29.468104ms min ``` Kubernetes-commit: 35d2431b3a89c5bd693846952e9d27ce4e3a0754 --- tools/cache/shared_informer.go | 37 +++++++++++++++-------------- tools/cache/shared_informer_test.go | 27 +++++++++++++++++++++ 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index a889fdbc..be8694dd 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -459,29 +459,30 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed") return } - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: s.indexer, - EmitDeltaTypeReplaced: true, - Transformer: s.transform, - }) - - cfg := &Config{ - Queue: fifo, - ListerWatcher: s.listerWatcher, - ObjectType: s.objectType, - ObjectDescription: s.objectDescription, - FullResyncPeriod: s.resyncCheckPeriod, - RetryOnError: false, - ShouldResync: s.processor.shouldResync, - - Process: s.HandleDeltas, - WatchErrorHandler: s.watchErrorHandler, - } func() { s.startedLock.Lock() defer s.startedLock.Unlock() + fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: s.indexer, + EmitDeltaTypeReplaced: true, + Transformer: s.transform, + }) + + cfg := &Config{ + Queue: fifo, + ListerWatcher: s.listerWatcher, + ObjectType: s.objectType, + ObjectDescription: s.objectDescription, + FullResyncPeriod: s.resyncCheckPeriod, + RetryOnError: false, + ShouldResync: s.processor.shouldResync, + + Process: s.HandleDeltas, + WatchErrorHandler: s.watchErrorHandler, + } + s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 30776b29..459f257f 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -393,6 +393,33 @@ func TestSharedInformerErrorHandling(t *testing.T) { close(stop) } +// TestSharedInformerStartRace is a regression test to ensure there is no race between +// Run and SetWatchErrorHandler, and Run and SetTransform. +func TestSharedInformerStartRace(t *testing.T) { + source := fcache.NewFakeControllerSource() + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + stop := make(chan struct{}) + go func() { + for { + select { + case <-stop: + return + default: + } + // Set dummy functions, just to test for race + informer.SetTransform(func(i interface{}) (interface{}, error) { + return i, nil + }) + informer.SetWatchErrorHandler(func(r *Reflector, err error) { + }) + } + }() + + go informer.Run(stop) + + close(stop) +} + func TestSharedInformerTransformer(t *testing.T) { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource()