Fix race in watch tests

This commit is contained in:
Wojciech Tyczynski 2015-12-22 13:16:14 +01:00
parent e108a32b08
commit 41c7835039
2 changed files with 12 additions and 9 deletions

View File

@ -94,6 +94,8 @@ type etcdWatcher struct {
userStop chan struct{} userStop chan struct{}
stopped bool stopped bool
stopLock sync.Mutex stopLock sync.Mutex
// wg is used to avoid calls to etcd after Stop()
wg sync.WaitGroup
// Injectable for testing. Send the event down the outgoing channel. // Injectable for testing. Send the event down the outgoing channel.
emit func(watch.Event) emit func(watch.Event)
@ -129,6 +131,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e
outgoing: make(chan watch.Event), outgoing: make(chan watch.Event),
userStop: make(chan struct{}), userStop: make(chan struct{}),
stopped: false, stopped: false,
wg: sync.WaitGroup{},
cache: cache, cache: cache,
ctx: nil, ctx: nil,
cancel: nil, cancel: nil,
@ -145,6 +148,11 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
defer close(w.etcdError) defer close(w.etcdError)
defer close(w.etcdIncoming) defer close(w.etcdIncoming)
// All calls to etcd are coming from this function - once it is finished
// no other call to etcd should be generated by this watcher.
w.wg.Add(1)
defer w.wg.Done()
// We need to be prepared, that Stop() can be called at any time. // We need to be prepared, that Stop() can be called at any time.
// It can potentially also be called, even before this function is called. // It can potentially also be called, even before this function is called.
// If that is the case, we simply skip all the code here. // If that is the case, we simply skip all the code here.
@ -456,4 +464,7 @@ func (w *etcdWatcher) Stop() {
w.stopped = true w.stopped = true
close(w.userStop) close(w.userStop)
} }
// Wait until all calls to etcd are finished and no other
// will be issued.
w.wg.Wait()
} }

View File

@ -246,7 +246,7 @@ func TestWatch(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
defer watching.Stop() // watching is explicitly closed below.
// Test normal case // Test normal case
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
@ -327,8 +327,6 @@ func TestWatchEtcdState(t *testing.T) {
if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) { if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a) t.Errorf("%s: expected %v, got %v", e, a)
} }
watching.Stop()
} }
func TestWatchFromZeroIndex(t *testing.T) { func TestWatchFromZeroIndex(t *testing.T) {
@ -379,8 +377,6 @@ func TestWatchFromZeroIndex(t *testing.T) {
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a) t.Errorf("%s: expected %v, got %v", e, a)
} }
watching.Stop()
} }
func TestWatchListFromZeroIndex(t *testing.T) { func TestWatchListFromZeroIndex(t *testing.T) {
@ -411,8 +407,6 @@ func TestWatchListFromZeroIndex(t *testing.T) {
if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%s: expected %v, got %v", e, a) t.Errorf("%s: expected %v, got %v", e, a)
} }
watching.Stop()
} }
func TestWatchListIgnoresRootKey(t *testing.T) { func TestWatchListIgnoresRootKey(t *testing.T) {
@ -444,8 +438,6 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
default: default:
// fall through, expected behavior // fall through, expected behavior
} }
watching.Stop()
} }
func TestWatchPurposefulShutdown(t *testing.T) { func TestWatchPurposefulShutdown(t *testing.T) {