From 41c783503997dbc4df10a9934a630b74829d4bb3 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 22 Dec 2015 13:16:14 +0100 Subject: [PATCH] Fix race in watch tests --- pkg/storage/etcd/etcd_watcher.go | 11 +++++++++++ pkg/storage/etcd/etcd_watcher_test.go | 10 +--------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index cfe14f49c15..567a67ab62c 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -94,6 +94,8 @@ type etcdWatcher struct { userStop chan struct{} stopped bool stopLock sync.Mutex + // wg is used to avoid calls to etcd after Stop() + wg sync.WaitGroup // Injectable for testing. Send the event down the outgoing channel. emit func(watch.Event) @@ -129,6 +131,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e outgoing: make(chan watch.Event), userStop: make(chan struct{}), stopped: false, + wg: sync.WaitGroup{}, cache: cache, ctx: nil, cancel: nil, @@ -145,6 +148,11 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st defer close(w.etcdError) defer close(w.etcdIncoming) + // All calls to etcd are coming from this function - once it is finished + // no other call to etcd should be generated by this watcher. + w.wg.Add(1) + defer w.wg.Done() + // We need to be prepared, that Stop() can be called at any time. // It can potentially also be called, even before this function is called. // If that is the case, we simply skip all the code here. @@ -456,4 +464,7 @@ func (w *etcdWatcher) Stop() { w.stopped = true close(w.userStop) } + // Wait until all calls to etcd are finished and no other + // will be issued. + w.wg.Wait() } diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index ebebe8d3ea3..23e4a59a161 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -246,7 +246,7 @@ func TestWatch(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - defer watching.Stop() + // watching is explicitly closed below. // Test normal case pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} @@ -327,8 +327,6 @@ func TestWatchEtcdState(t *testing.T) { if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) { t.Errorf("%s: expected %v, got %v", e, a) } - - watching.Stop() } func TestWatchFromZeroIndex(t *testing.T) { @@ -379,8 +377,6 @@ func TestWatchFromZeroIndex(t *testing.T) { if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { t.Errorf("%s: expected %v, got %v", e, a) } - - watching.Stop() } func TestWatchListFromZeroIndex(t *testing.T) { @@ -411,8 +407,6 @@ func TestWatchListFromZeroIndex(t *testing.T) { if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { t.Errorf("%s: expected %v, got %v", e, a) } - - watching.Stop() } func TestWatchListIgnoresRootKey(t *testing.T) { @@ -444,8 +438,6 @@ func TestWatchListIgnoresRootKey(t *testing.T) { default: // fall through, expected behavior } - - watching.Stop() } func TestWatchPurposefulShutdown(t *testing.T) {