diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index e4f033f8ae2..4450c8e337b 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -44,7 +44,7 @@ func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) { defer wd.Unlock() wd.watchers = append(wd.watchers, watcher) for _, event := range wd.eventsSoFar { - go watcher.Action(event.Type, event.Object) + watcher.Action(event.Type, event.Object) } } @@ -52,13 +52,11 @@ func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) { func (wd *WatcherDispatcher) Add(obj runtime.Object) { wd.Lock() defer wd.Unlock() - event := &watch.Event{ - Type: watch.Added, - Object: obj, - } - wd.eventsSoFar = append(wd.eventsSoFar, event) + wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Added, Object: obj}) for _, watcher := range wd.watchers { - go watcher.Add(obj) + if !watcher.IsStopped() { + watcher.Add(obj) + } } } @@ -66,13 +64,11 @@ func (wd *WatcherDispatcher) Add(obj runtime.Object) { func (wd *WatcherDispatcher) Modify(obj runtime.Object) { wd.Lock() defer wd.Unlock() - event := &watch.Event{ - Type: watch.Modified, - Object: obj, - } - wd.eventsSoFar = append(wd.eventsSoFar, event) + wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Modified, Object: obj}) for _, watcher := range wd.watchers { - go watcher.Modify(obj) + if !watcher.IsStopped() { + watcher.Modify(obj) + } } } @@ -80,13 +76,11 @@ func (wd *WatcherDispatcher) Modify(obj runtime.Object) { func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) { wd.Lock() defer wd.Unlock() - event := &watch.Event{ - Type: watch.Deleted, - Object: lastValue, - } - wd.eventsSoFar = append(wd.eventsSoFar, event) + wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Deleted, Object: lastValue}) for _, watcher := range wd.watchers { - go watcher.Delete(lastValue) + if !watcher.IsStopped() { + watcher.Delete(lastValue) + } } } @@ -94,13 +88,11 @@ func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) { func (wd *WatcherDispatcher) Error(errValue runtime.Object) { wd.Lock() defer wd.Unlock() - event := &watch.Event{ - Type: watch.Error, - Object: errValue, - } - wd.eventsSoFar = append(wd.eventsSoFar, event) + wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Error, Object: errValue}) for _, watcher := range wd.watchers { - go watcher.Error(errValue) + if !watcher.IsStopped() { + watcher.Error(errValue) + } } } @@ -108,13 +100,11 @@ func (wd *WatcherDispatcher) Error(errValue runtime.Object) { func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object) { wd.Lock() defer wd.Unlock() - event := &watch.Event{ - Type: action, - Object: obj, - } - wd.eventsSoFar = append(wd.eventsSoFar, event) + wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: action, Object: obj}) for _, watcher := range wd.watchers { - go watcher.Action(action, obj) + if !watcher.IsStopped() { + watcher.Action(action, obj) + } } } @@ -127,7 +117,7 @@ func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher { } client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { - watcher := watch.NewFake() + watcher := watch.NewFakeWithChanSize(100) dispatcher.register(watcher) return true, watcher, nil }) diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index 96b2fe3de28..90125ac6dd2 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -89,6 +89,12 @@ func NewFake() *FakeWatcher { } } +func NewFakeWithChanSize(size int) *FakeWatcher { + return &FakeWatcher{ + result: make(chan Event, size), + } +} + // Stop implements Interface.Stop(). func (f *FakeWatcher) Stop() { f.Lock() @@ -99,6 +105,12 @@ func (f *FakeWatcher) Stop() { } } +func (f *FakeWatcher) IsStopped() bool { + f.Lock() + defer f.Unlock() + return f.Stopped +} + // Reset prepares the watcher to be reused. func (f *FakeWatcher) Reset() { f.Lock()