From 46cc47e324e3faa5242181b5c12949051d85c778 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 7 Oct 2016 14:40:37 +0200 Subject: [PATCH 1/2] Race free version of FakeWatcher --- pkg/watch/watch.go | 119 +++++++++++++++++++++++++++++++++++++++- pkg/watch/watch_test.go | 47 ++++++++++++++++ 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index 5c2e4b91507..550149c6880 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -17,6 +17,7 @@ limitations under the License. package watch import ( + "fmt" "sync" "k8s.io/kubernetes/pkg/runtime" @@ -44,6 +45,8 @@ const ( Modified EventType = "MODIFIED" Deleted EventType = "DELETED" Error EventType = "ERROR" + + DefaultChanSize int32 = 100 ) // Event represents a single event to a watched resource. @@ -91,7 +94,7 @@ func NewFake() *FakeWatcher { } } -func NewFakeWithChanSize(size int) *FakeWatcher { +func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher { return &FakeWatcher{ result: make(chan Event, size), } @@ -150,3 +153,117 @@ func (f *FakeWatcher) Error(errValue runtime.Object) { func (f *FakeWatcher) Action(action EventType, obj runtime.Object) { f.result <- Event{action, obj} } + +// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. +type RaceFreeFakeWatcher struct { + result chan Event + Stopped bool + sync.Mutex +} + +func NewRaceFreeFake() *RaceFreeFakeWatcher { + return &RaceFreeFakeWatcher{ + result: make(chan Event, DefaultChanSize), + } +} + +// Stop implements Interface.Stop(). +func (f *RaceFreeFakeWatcher) Stop() { + f.Lock() + defer f.Unlock() + if !f.Stopped { + glog.V(4).Infof("Stopping fake watcher.") + close(f.result) + f.Stopped = true + } +} + +func (f *RaceFreeFakeWatcher) IsStopped() bool { + f.Lock() + defer f.Unlock() + return f.Stopped +} + +// Reset prepares the watcher to be reused. +func (f *RaceFreeFakeWatcher) Reset() { + f.Lock() + defer f.Unlock() + f.Stopped = false + f.result = make(chan Event, DefaultChanSize) +} + +func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event { + f.Lock() + defer f.Unlock() + return f.result +} + +// Add sends an add event. +func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Added, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Modify sends a modify event. +func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Modified, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Delete sends a delete event. +func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Deleted, lastValue}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Error sends an Error event. +func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Error, errValue}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Action sends an event of the requested type, for table-based testing. +func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{action, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} diff --git a/pkg/watch/watch_test.go b/pkg/watch/watch_test.go index d58a24885ad..3569af55ea6 100644 --- a/pkg/watch/watch_test.go +++ b/pkg/watch/watch_test.go @@ -73,6 +73,53 @@ func TestFake(t *testing.T) { consumer(f) } +func TestRaceFreeFake(t *testing.T) { + f := NewRaceFreeFake() + + table := []struct { + t EventType + s testType + }{ + {Added, testType("foo")}, + {Modified, testType("qux")}, + {Modified, testType("bar")}, + {Deleted, testType("bar")}, + {Error, testType("error: blah")}, + } + + // Prove that f implements Interface by phrasing this as a function. + consumer := func(w Interface) { + for _, expect := range table { + got, ok := <-w.ResultChan() + if !ok { + t.Fatalf("closed early") + } + if e, a := expect.t, got.Type; e != a { + t.Fatalf("Expected %v, got %v", e, a) + } + if a, ok := got.Object.(testType); !ok || a != expect.s { + t.Fatalf("Expected %v, got %v", expect.s, a) + } + } + _, stillOpen := <-w.ResultChan() + if stillOpen { + t.Fatal("Never stopped") + } + } + + sender := func() { + f.Add(testType("foo")) + f.Action(Modified, testType("qux")) + f.Modify(testType("bar")) + f.Delete(testType("bar")) + f.Error(testType("error: blah")) + f.Stop() + } + + go sender() + consumer(f) +} + func TestEmpty(t *testing.T) { w := NewEmptyWatch() _, ok := <-w.ResultChan() From e033c5fe7bcfff96f00d11baf00b0d093337906c Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 7 Oct 2016 14:41:18 +0200 Subject: [PATCH 2/2] Use RaceFreeFakeWatcher in federation controllers helper and ingress controller --- .../util/test/test_helper.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index 9fa3416173c..c3ac77c6b0e 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -37,11 +37,11 @@ import ( // A structure that distributes eventes to multiple watchers. type WatcherDispatcher struct { sync.Mutex - watchers []*watch.FakeWatcher + watchers []*watch.RaceFreeFakeWatcher eventsSoFar []*watch.Event } -func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) { +func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) { wd.Lock() defer wd.Unlock() wd.watchers = append(wd.watchers, watcher) @@ -50,6 +50,14 @@ func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) { } } +func (wd *WatcherDispatcher) Stop() { + wd.Lock() + defer wd.Unlock() + for _, watcher := range wd.watchers { + watcher.Stop() + } +} + func copy(obj runtime.Object) runtime.Object { objCopy, err := api.Scheme.DeepCopy(obj) if err != nil { @@ -126,12 +134,12 @@ func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object) // All subsequent requests for a watch on the client will result in returning this fake watcher. func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher { dispatcher := &WatcherDispatcher{ - watchers: make([]*watch.FakeWatcher, 0), + watchers: make([]*watch.RaceFreeFakeWatcher, 0), eventsSoFar: make([]*watch.Event, 0), } client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { - watcher := watch.NewFakeWithChanSize(100) + watcher := watch.NewRaceFreeFake() dispatcher.register(watcher) return true, watcher, nil })