From 46cc47e324e3faa5242181b5c12949051d85c778 Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Fri, 7 Oct 2016 14:40:37 +0200 Subject: [PATCH] Race free version of FakeWatcher --- pkg/watch/watch.go | 119 +++++++++++++++++++++++++++++++++++++++- pkg/watch/watch_test.go | 47 ++++++++++++++++ 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index 5c2e4b91507..550149c6880 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -17,6 +17,7 @@ limitations under the License. package watch import ( + "fmt" "sync" "k8s.io/kubernetes/pkg/runtime" @@ -44,6 +45,8 @@ const ( Modified EventType = "MODIFIED" Deleted EventType = "DELETED" Error EventType = "ERROR" + + DefaultChanSize int32 = 100 ) // Event represents a single event to a watched resource. @@ -91,7 +94,7 @@ func NewFake() *FakeWatcher { } } -func NewFakeWithChanSize(size int) *FakeWatcher { +func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher { return &FakeWatcher{ result: make(chan Event, size), } @@ -150,3 +153,117 @@ func (f *FakeWatcher) Error(errValue runtime.Object) { func (f *FakeWatcher) Action(action EventType, obj runtime.Object) { f.result <- Event{action, obj} } + +// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe. +type RaceFreeFakeWatcher struct { + result chan Event + Stopped bool + sync.Mutex +} + +func NewRaceFreeFake() *RaceFreeFakeWatcher { + return &RaceFreeFakeWatcher{ + result: make(chan Event, DefaultChanSize), + } +} + +// Stop implements Interface.Stop(). +func (f *RaceFreeFakeWatcher) Stop() { + f.Lock() + defer f.Unlock() + if !f.Stopped { + glog.V(4).Infof("Stopping fake watcher.") + close(f.result) + f.Stopped = true + } +} + +func (f *RaceFreeFakeWatcher) IsStopped() bool { + f.Lock() + defer f.Unlock() + return f.Stopped +} + +// Reset prepares the watcher to be reused. +func (f *RaceFreeFakeWatcher) Reset() { + f.Lock() + defer f.Unlock() + f.Stopped = false + f.result = make(chan Event, DefaultChanSize) +} + +func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event { + f.Lock() + defer f.Unlock() + return f.result +} + +// Add sends an add event. +func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Added, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Modify sends a modify event. +func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Modified, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Delete sends a delete event. +func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Deleted, lastValue}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Error sends an Error event. +func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{Error, errValue}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} + +// Action sends an event of the requested type, for table-based testing. +func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) { + f.Lock() + defer f.Unlock() + if !f.Stopped { + select { + case f.result <- Event{action, obj}: + return + default: + panic(fmt.Errorf("channel full")) + } + } +} diff --git a/pkg/watch/watch_test.go b/pkg/watch/watch_test.go index d58a24885ad..3569af55ea6 100644 --- a/pkg/watch/watch_test.go +++ b/pkg/watch/watch_test.go @@ -73,6 +73,53 @@ func TestFake(t *testing.T) { consumer(f) } +func TestRaceFreeFake(t *testing.T) { + f := NewRaceFreeFake() + + table := []struct { + t EventType + s testType + }{ + {Added, testType("foo")}, + {Modified, testType("qux")}, + {Modified, testType("bar")}, + {Deleted, testType("bar")}, + {Error, testType("error: blah")}, + } + + // Prove that f implements Interface by phrasing this as a function. + consumer := func(w Interface) { + for _, expect := range table { + got, ok := <-w.ResultChan() + if !ok { + t.Fatalf("closed early") + } + if e, a := expect.t, got.Type; e != a { + t.Fatalf("Expected %v, got %v", e, a) + } + if a, ok := got.Object.(testType); !ok || a != expect.s { + t.Fatalf("Expected %v, got %v", expect.s, a) + } + } + _, stillOpen := <-w.ResultChan() + if stillOpen { + t.Fatal("Never stopped") + } + } + + sender := func() { + f.Add(testType("foo")) + f.Action(Modified, testType("qux")) + f.Modify(testType("bar")) + f.Delete(testType("bar")) + f.Error(testType("error: blah")) + f.Stop() + } + + go sender() + consumer(f) +} + func TestEmpty(t *testing.T) { w := NewEmptyWatch() _, ok := <-w.ResultChan()