Merge pull request #117870 from howardjohn/client/run-errorhandler-race

informer: fix race against Run and SetTransform/SetWatchErrorHandler

Kubernetes-commit: decf1e1a9b53a4f4b24bb780d6433e2d5e959d83
This commit is contained in:
Kubernetes Publisher 2023-06-01 15:41:45 -07:00
commit 4485c4ecbb
2 changed files with 46 additions and 18 deletions

View File

@ -459,29 +459,30 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
return
}
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
ObjectDescription: s.objectDescription,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
Transformer: s.transform,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
ObjectDescription: s.objectDescription,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true

View File

@ -393,6 +393,33 @@ func TestSharedInformerErrorHandling(t *testing.T) {
close(stop)
}
// TestSharedInformerStartRace is a regression test to ensure there is no race between
// Run and SetWatchErrorHandler, and Run and SetTransform.
func TestSharedInformerStartRace(t *testing.T) {
source := fcache.NewFakeControllerSource()
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
stop := make(chan struct{})
go func() {
for {
select {
case <-stop:
return
default:
}
// Set dummy functions, just to test for race
informer.SetTransform(func(i interface{}) (interface{}, error) {
return i, nil
})
informer.SetWatchErrorHandler(func(r *Reflector, err error) {
})
}
}()
go informer.Run(stop)
close(stop)
}
func TestSharedInformerTransformer(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()