diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 6a41ddf42e9..e83cf695eea 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -435,17 +435,18 @@ func TestWatchControllers(t *testing.T) { fakeEtcd := util.MakeFakeEtcdClient(t) manager := MakeReplicationManager(fakeEtcd, nil) var testControllerSpec api.ReplicationController - receivedCount := 0 + received := make(chan bool) manager.syncHandler = func(controllerSpec api.ReplicationController) error { if !reflect.DeepEqual(controllerSpec, testControllerSpec) { t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) } - receivedCount++ + close(received) return nil } go manager.watchControllers() - time.Sleep(10 * time.Millisecond) + + fakeEtcd.WaitForWatchCompletion() // Test normal case testControllerSpec.ID = "foo" @@ -456,14 +457,14 @@ func TestWatchControllers(t *testing.T) { }, } - time.Sleep(10 * time.Millisecond) - if receivedCount != 1 { - t.Errorf("Expected 1 call but got %v", receivedCount) + select { + case <-received: + case <-time.After(10 * time.Millisecond): + t.Errorf("Expected 1 call but got 0") } // Test error case fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") - time.Sleep(10 * time.Millisecond) // Did everything shut down? if _, open := <-fakeEtcd.WatchResponse; open { @@ -472,9 +473,8 @@ func TestWatchControllers(t *testing.T) { // Test purposeful shutdown go manager.watchControllers() - time.Sleep(10 * time.Millisecond) + fakeEtcd.WaitForWatchCompletion() fakeEtcd.WatchStop <- true - time.Sleep(10 * time.Millisecond) // Did everything shut down? if _, open := <-fakeEtcd.WatchResponse; open { diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 9fddd076695..635d6236191 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -18,6 +18,7 @@ package util import ( "fmt" + "sync" "testing" "github.com/coreos/go-etcd/etcd" @@ -29,6 +30,9 @@ type EtcdResponseWithError struct { } type FakeEtcdClient struct { + condWatchCompleted *sync.Cond + condLock sync.Mutex + Data map[string]EtcdResponseWithError DeletedKeys []string Err error @@ -44,10 +48,24 @@ type FakeEtcdClient struct { } func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { - return &FakeEtcdClient{ + ret := &FakeEtcdClient{ t: t, Data: map[string]EtcdResponseWithError{}, } + // There are three publicly accessible channels in FakeEtcdClient: + // - WatchResponse + // - WatchInjectError + // - WatchStop + // They are only available when Watch() is called. If users of + // FakeEtcdClient want to use any of these channels, they have to call + // WaitForWatchCompletion before any operation on these channels. + // Internally, FakeEtcdClient use condWatchCompleted to indicate if the + // Watch() method has been called. WaitForWatchCompletion() will wait + // on condWatchCompleted. By the end of the Watch() method, it will + // call Broadcast() on condWatchCompleted, which will awake any + // goroutine waiting on this condition. + ret.condWatchCompleted = sync.NewCond(&ret.condLock) + return ret } func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { @@ -97,12 +115,22 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err return &etcd.Response{}, f.Err } +func (f *FakeEtcdClient) WaitForWatchCompletion() { + f.condLock.Lock() + defer f.condLock.Unlock() + f.condWatchCompleted.Wait() +} + func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { f.WatchResponse = receiver f.WatchStop = stop injectedError := make(chan error) + defer close(injectedError) f.WatchInjectError = injectedError + + f.condWatchCompleted.Broadcast() + select { case <-stop: return nil, etcd.ErrWatchStoppedByUser