From 62055090b491e971bf24b69ef54a83ca0f93b8fd Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 14:48:57 -0700 Subject: [PATCH 01/10] fix data races in controller --- pkg/controller/replication_controller_test.go | 18 +++++------ pkg/util/fake_etcd_client.go | 32 +++++++++++++++++-- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 6a41ddf42e9..dd6a5be6fac 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -435,17 +435,18 @@ func TestWatchControllers(t *testing.T) { fakeEtcd := util.MakeFakeEtcdClient(t) manager := MakeReplicationManager(fakeEtcd, nil) var testControllerSpec api.ReplicationController - receivedCount := 0 + received := make(chan bool) manager.syncHandler = func(controllerSpec api.ReplicationController) error { if !reflect.DeepEqual(controllerSpec, testControllerSpec) { t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) } - receivedCount++ + close(received) return nil } go manager.watchControllers() - time.Sleep(10 * time.Millisecond) + + fakeEtcd.WaitToWatch() // Test normal case testControllerSpec.ID = "foo" @@ -456,14 +457,14 @@ func TestWatchControllers(t *testing.T) { }, } - time.Sleep(10 * time.Millisecond) - if receivedCount != 1 { - t.Errorf("Expected 1 call but got %v", receivedCount) + select { + case <-received: + case <-time.After(10 * time.Millisecond): + t.Errorf("Expected 1 call but got 0") } // Test error case fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") - time.Sleep(10 * time.Millisecond) // Did everything shut down? if _, open := <-fakeEtcd.WatchResponse; open { @@ -472,9 +473,8 @@ func TestWatchControllers(t *testing.T) { // Test purposeful shutdown go manager.watchControllers() - time.Sleep(10 * time.Millisecond) + fakeEtcd.WaitToWatch() fakeEtcd.WatchStop <- true - time.Sleep(10 * time.Millisecond) // Did everything shut down? if _, open := <-fakeEtcd.WatchResponse; open { diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 9fddd076695..af111cb822b 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -18,6 +18,7 @@ package util import ( "fmt" + "sync" "testing" "github.com/coreos/go-etcd/etcd" @@ -29,6 +30,11 @@ type EtcdResponseWithError struct { } type FakeEtcdClient struct { + // chanLock guards watchChanReady. + chanLock sync.Mutex + // watchChanReady is readable when all public channels are ready + watchChanReady chan bool + Data map[string]EtcdResponseWithError DeletedKeys []string Err error @@ -45,8 +51,9 @@ type FakeEtcdClient struct { func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { return &FakeEtcdClient{ - t: t, - Data: map[string]EtcdResponseWithError{}, + t: t, + Data: map[string]EtcdResponseWithError{}, + watchChanReady: make(chan bool), } } @@ -97,16 +104,37 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err return &etcd.Response{}, f.Err } +func (f *FakeEtcdClient) WaitToWatch() { + f.chanLock.Lock() + defer f.chanLock.Unlock() + <-f.watchChanReady +} + func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { f.WatchResponse = receiver f.WatchStop = stop injectedError := make(chan error) + defer close(injectedError) f.WatchInjectError = injectedError + + // close the channel indicating all channels of the fake client are + // ready. + close(f.watchChanReady) + + defer func() { + f.chanLock.Lock() + defer f.chanLock.Unlock() + // A stop will make f.WatchStop close, which in turn makes it un-write-able. + // Need to have another call on Watch() to make it ready. + f.watchChanReady = make(chan bool) + }() + select { case <-stop: return nil, etcd.ErrWatchStoppedByUser case err := <-injectedError: + f.watchChanReady = make(chan bool) return nil, err } // Never get here. From fa1fbe88c156287c35bd63a1a98603c95e59567e Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 15:32:55 -0700 Subject: [PATCH 02/10] use wait group --- pkg/util/fake_etcd_client.go | 38 ++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index af111cb822b..3ad9997be3b 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -30,10 +30,7 @@ type EtcdResponseWithError struct { } type FakeEtcdClient struct { - // chanLock guards watchChanReady. - chanLock sync.Mutex - // watchChanReady is readable when all public channels are ready - watchChanReady chan bool + nrUnreadyChannels sync.WaitGroup Data map[string]EtcdResponseWithError DeletedKeys []string @@ -50,11 +47,17 @@ type FakeEtcdClient struct { } func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { - return &FakeEtcdClient{ - t: t, - Data: map[string]EtcdResponseWithError{}, - watchChanReady: make(chan bool), + ret := &FakeEtcdClient{ + t: t, + Data: map[string]EtcdResponseWithError{}, + // watchChanReady: make(chan bool), } + // There are 3 channels that are not ready: + // - WatchResponse chan *etcd.Response + // - WatchInjectError chan<- error + // - WatchStop chan<- bool + ret.nrUnreadyChannels.Add(3) + return ret } func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { @@ -105,36 +108,29 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err } func (f *FakeEtcdClient) WaitToWatch() { - f.chanLock.Lock() - defer f.chanLock.Unlock() - <-f.watchChanReady + f.nrUnreadyChannels.Wait() } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { f.WatchResponse = receiver + f.nrUnreadyChannels.Done() f.WatchStop = stop + f.nrUnreadyChannels.Done() injectedError := make(chan error) defer close(injectedError) f.WatchInjectError = injectedError - - // close the channel indicating all channels of the fake client are - // ready. - close(f.watchChanReady) + f.nrUnreadyChannels.Done() defer func() { - f.chanLock.Lock() - defer f.chanLock.Unlock() - // A stop will make f.WatchStop close, which in turn makes it un-write-able. - // Need to have another call on Watch() to make it ready. - f.watchChanReady = make(chan bool) + // After calling this function, the WatchStop channel will not be ready + f.nrUnreadyChannels.Add(3) }() select { case <-stop: return nil, etcd.ErrWatchStoppedByUser case err := <-injectedError: - f.watchChanReady = make(chan bool) return nil, err } // Never get here. From 014165ded6c590525a65950f10b22f1463252b77 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 15:39:03 -0700 Subject: [PATCH 03/10] style --- pkg/util/fake_etcd_client.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 3ad9997be3b..f4958c60b4e 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -122,10 +122,8 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, f.WatchInjectError = injectedError f.nrUnreadyChannels.Done() - defer func() { - // After calling this function, the WatchStop channel will not be ready - f.nrUnreadyChannels.Add(3) - }() + // After calling this function, the WatchStop channel will not be ready + defer f.nrUnreadyChannels.Add(3) select { case <-stop: From f13f1a5da6852313353b074c8c200a1b653d10be Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 16:07:46 -0700 Subject: [PATCH 04/10] use sync.Cond --- pkg/util/fake_etcd_client.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index f4958c60b4e..a0435a8a2ba 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -30,7 +30,8 @@ type EtcdResponseWithError struct { } type FakeEtcdClient struct { - nrUnreadyChannels sync.WaitGroup + condChannelsReady *sync.Cond + condLock sync.Mutex Data map[string]EtcdResponseWithError DeletedKeys []string @@ -52,11 +53,8 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { Data: map[string]EtcdResponseWithError{}, // watchChanReady: make(chan bool), } - // There are 3 channels that are not ready: - // - WatchResponse chan *etcd.Response - // - WatchInjectError chan<- error - // - WatchStop chan<- bool - ret.nrUnreadyChannels.Add(3) + // The channels are not ready yet + ret.condChannelsReady = sync.NewCond(&ret.condLock) return ret } @@ -108,22 +106,29 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err } func (f *FakeEtcdClient) WaitToWatch() { - f.nrUnreadyChannels.Wait() + f.condLock.Lock() + defer f.condLock.Unlock() + f.condChannelsReady.Wait() } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { f.WatchResponse = receiver - f.nrUnreadyChannels.Done() f.WatchStop = stop - f.nrUnreadyChannels.Done() injectedError := make(chan error) defer close(injectedError) f.WatchInjectError = injectedError - f.nrUnreadyChannels.Done() + + f.condLock.Lock() + f.condChannelsReady.Broadcast() + f.condLock.Unlock() // After calling this function, the WatchStop channel will not be ready - defer f.nrUnreadyChannels.Add(3) + defer func() { + f.condLock.Lock() + f.condChannelsReady = sync.NewCond(&f.condLock) + f.condLock.Unlock() + }() select { case <-stop: From 905c6dcb1053bef3a29cf6bd9c94e825cbd68e35 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 16:09:15 -0700 Subject: [PATCH 05/10] gofmt -r "WaitToWatch->WaitForWatchCompletion" --- pkg/controller/replication_controller_test.go | 4 ++-- pkg/util/fake_etcd_client.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index dd6a5be6fac..e83cf695eea 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -446,7 +446,7 @@ func TestWatchControllers(t *testing.T) { go manager.watchControllers() - fakeEtcd.WaitToWatch() + fakeEtcd.WaitForWatchCompletion() // Test normal case testControllerSpec.ID = "foo" @@ -473,7 +473,7 @@ func TestWatchControllers(t *testing.T) { // Test purposeful shutdown go manager.watchControllers() - fakeEtcd.WaitToWatch() + fakeEtcd.WaitForWatchCompletion() fakeEtcd.WatchStop <- true // Did everything shut down? diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index a0435a8a2ba..653a919e46d 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -105,7 +105,7 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err return &etcd.Response{}, f.Err } -func (f *FakeEtcdClient) WaitToWatch() { +func (f *FakeEtcdClient) WaitForWatchCompletion() { f.condLock.Lock() defer f.condLock.Unlock() f.condChannelsReady.Wait() From 7b432eac5c6530c60c0cca3fc74698af44e3c87e Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 18:05:01 -0700 Subject: [PATCH 06/10] gofmt -r "condChannelsReady->condWatchCompleted" --- pkg/util/fake_etcd_client.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 653a919e46d..68eb2c09d59 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -30,8 +30,8 @@ type EtcdResponseWithError struct { } type FakeEtcdClient struct { - condChannelsReady *sync.Cond - condLock sync.Mutex + condWatchCompleted *sync.Cond + condLock sync.Mutex Data map[string]EtcdResponseWithError DeletedKeys []string @@ -54,7 +54,7 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { // watchChanReady: make(chan bool), } // The channels are not ready yet - ret.condChannelsReady = sync.NewCond(&ret.condLock) + ret.condWatchCompleted = sync.NewCond(&ret.condLock) return ret } @@ -108,7 +108,7 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err func (f *FakeEtcdClient) WaitForWatchCompletion() { f.condLock.Lock() defer f.condLock.Unlock() - f.condChannelsReady.Wait() + f.condWatchCompleted.Wait() } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { @@ -120,13 +120,13 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, f.WatchInjectError = injectedError f.condLock.Lock() - f.condChannelsReady.Broadcast() + f.condWatchCompleted.Broadcast() f.condLock.Unlock() // After calling this function, the WatchStop channel will not be ready defer func() { f.condLock.Lock() - f.condChannelsReady = sync.NewCond(&f.condLock) + f.condWatchCompleted = sync.NewCond(&f.condLock) f.condLock.Unlock() }() From 101806cb5e152a92e84430d566e04b19146cf2f4 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 18:27:41 -0700 Subject: [PATCH 07/10] do not recreate --- pkg/util/fake_etcd_client.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 68eb2c09d59..c7f3160f06a 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -123,13 +123,6 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, f.condWatchCompleted.Broadcast() f.condLock.Unlock() - // After calling this function, the WatchStop channel will not be ready - defer func() { - f.condLock.Lock() - f.condWatchCompleted = sync.NewCond(&f.condLock) - f.condLock.Unlock() - }() - select { case <-stop: return nil, etcd.ErrWatchStoppedByUser From f68446fed93b8a57a96eb06667d9c04d8a6951a7 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Mon, 30 Jun 2014 20:51:38 -0700 Subject: [PATCH 08/10] comment --- pkg/util/fake_etcd_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index c7f3160f06a..f21a5c17e1e 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -51,9 +51,8 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { ret := &FakeEtcdClient{ t: t, Data: map[string]EtcdResponseWithError{}, - // watchChanReady: make(chan bool), } - // The channels are not ready yet + // Watch() method has not been called. ret.condWatchCompleted = sync.NewCond(&ret.condLock) return ret } From 7f9d66525a321bc5ebfa9665f6fe85d2a45f9c15 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Tue, 1 Jul 2014 11:13:05 -0700 Subject: [PATCH 09/10] remove lock for broadcast --- pkg/util/fake_etcd_client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index f21a5c17e1e..df6ac54c767 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -118,9 +118,7 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, defer close(injectedError) f.WatchInjectError = injectedError - f.condLock.Lock() f.condWatchCompleted.Broadcast() - f.condLock.Unlock() select { case <-stop: From 44935c2f946e4683bf920a05f8e9eec9de1ca991 Mon Sep 17 00:00:00 2001 From: Nan Deng Date: Tue, 1 Jul 2014 11:21:17 -0700 Subject: [PATCH 10/10] comment --- pkg/util/fake_etcd_client.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index df6ac54c767..635d6236191 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -52,7 +52,18 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { t: t, Data: map[string]EtcdResponseWithError{}, } - // Watch() method has not been called. + // There are three publicly accessible channels in FakeEtcdClient: + // - WatchResponse + // - WatchInjectError + // - WatchStop + // They are only available when Watch() is called. If users of + // FakeEtcdClient want to use any of these channels, they have to call + // WaitForWatchCompletion before any operation on these channels. + // Internally, FakeEtcdClient use condWatchCompleted to indicate if the + // Watch() method has been called. WaitForWatchCompletion() will wait + // on condWatchCompleted. By the end of the Watch() method, it will + // call Broadcast() on condWatchCompleted, which will awake any + // goroutine waiting on this condition. ret.condWatchCompleted = sync.NewCond(&ret.condLock) return ret }