From 62055090b491e971bf24b69ef54a83ca0f93b8fd Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 14:48:57 -0700 Subject: [PATCH] fix data races in controller --- pkg/controller/replication_controller_test.go | 18 +++++------ pkg/util/fake_etcd_client.go | 32 +++++++++++++++++-- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 6a41ddf42e9..dd6a5be6fac 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -435,17 +435,18 @@ func TestWatchControllers(t *testing.T) { fakeEtcd := util.MakeFakeEtcdClient(t) manager := MakeReplicationManager(fakeEtcd, nil) var testControllerSpec api.ReplicationController - receivedCount := 0 + received := make(chan bool) manager.syncHandler = func(controllerSpec api.ReplicationController) error { if !reflect.DeepEqual(controllerSpec, testControllerSpec) { t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) } - receivedCount++ + close(received) return nil } go manager.watchControllers() - time.Sleep(10 * time.Millisecond) + + fakeEtcd.WaitToWatch() // Test normal case testControllerSpec.ID = "foo" @@ -456,14 +457,14 @@ func TestWatchControllers(t *testing.T) { }, } - time.Sleep(10 * time.Millisecond) - if receivedCount != 1 { - t.Errorf("Expected 1 call but got %v", receivedCount) + select { + case <-received: + case <-time.After(10 * time.Millisecond): + t.Errorf("Expected 1 call but got 0") } // Test error case fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") - time.Sleep(10 * time.Millisecond) // Did everything shut down? if _, open := <-fakeEtcd.WatchResponse; open { @@ -472,9 +473,8 @@ func TestWatchControllers(t *testing.T) { // Test purposeful shutdown go manager.watchControllers() - time.Sleep(10 * time.Millisecond) + fakeEtcd.WaitToWatch() fakeEtcd.WatchStop <- true - time.Sleep(10 * time.Millisecond) // Did everything shut down? if _, open := <-fakeEtcd.WatchResponse; open { diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 9fddd076695..af111cb822b 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -18,6 +18,7 @@ package util import ( "fmt" + "sync" "testing" "github.com/coreos/go-etcd/etcd" @@ -29,6 +30,11 @@ type EtcdResponseWithError struct { } type FakeEtcdClient struct { + // chanLock guards watchChanReady. + chanLock sync.Mutex + // watchChanReady is readable when all public channels are ready + watchChanReady chan bool + Data map[string]EtcdResponseWithError DeletedKeys []string Err error @@ -45,8 +51,9 @@ type FakeEtcdClient struct { func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { return &FakeEtcdClient{ - t: t, - Data: map[string]EtcdResponseWithError{}, + t: t, + Data: map[string]EtcdResponseWithError{}, + watchChanReady: make(chan bool), } } @@ -97,16 +104,37 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err return &etcd.Response{}, f.Err } +func (f *FakeEtcdClient) WaitToWatch() { + f.chanLock.Lock() + defer f.chanLock.Unlock() + <-f.watchChanReady +} + func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { f.WatchResponse = receiver f.WatchStop = stop injectedError := make(chan error) + defer close(injectedError) f.WatchInjectError = injectedError + + // close the channel indicating all channels of the fake client are + // ready. + close(f.watchChanReady) + + defer func() { + f.chanLock.Lock() + defer f.chanLock.Unlock() + // A stop will make f.WatchStop close, which in turn makes it un-write-able. + // Need to have another call on Watch() to make it ready. + f.watchChanReady = make(chan bool) + }() + select { case <-stop: return nil, etcd.ErrWatchStoppedByUser case err := <-injectedError: + f.watchChanReady = make(chan bool) return nil, err } // Never get here.