informer: fix race against Run and SetTransform/SetWatchErrorHandler

`SetWatchErrorHandler` claims it will fail if Run() has already started.
But if they are called concurrently, it will actually trigger a data
race.

With this PR:
```
62702 runs so far, 0 failures (100.00% pass rate). 59.152682ms avg, 189.068387ms max, 26.623785ms min
```

Without this PR:
```
5012 runs so far, 38 failures (99.25% pass rate). 58.675502ms avg, 186.018084ms max, 29.468104ms min
```

Kubernetes-commit: 35d2431b3a89c5bd693846952e9d27ce4e3a0754
This commit is contained in:
John Howard 2023-05-08 10:11:54 -07:00 committed by Kubernetes Publisher
parent cf830e3cb3
commit 702d7378b6
2 changed files with 46 additions and 18 deletions

View File

@ -459,6 +459,11 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
return
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
@ -478,10 +483,6 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true

View File

@ -393,6 +393,33 @@ func TestSharedInformerErrorHandling(t *testing.T) {
close(stop)
}
// TestSharedInformerStartRace is a regression test to ensure there is no race between
// Run and SetWatchErrorHandler, and Run and SetTransform.
func TestSharedInformerStartRace(t *testing.T) {
source := fcache.NewFakeControllerSource()
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
stop := make(chan struct{})
go func() {
for {
select {
case <-stop:
return
default:
}
// Set dummy functions, just to test for race
informer.SetTransform(func(i interface{}) (interface{}, error) {
return i, nil
})
informer.SetWatchErrorHandler(func(r *Reflector, err error) {
})
}
}()
go informer.Run(stop)
close(stop)
}
func TestSharedInformerTransformer(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()