fix data races in controller

This commit is contained in:
Nan Deng 2014-06-30 14:48:57 -07:00
parent 655bca7e55
commit 62055090b4
2 changed files with 39 additions and 11 deletions

View File

@ -435,17 +435,18 @@ func TestWatchControllers(t *testing.T) {
fakeEtcd := util.MakeFakeEtcdClient(t) fakeEtcd := util.MakeFakeEtcdClient(t)
manager := MakeReplicationManager(fakeEtcd, nil) manager := MakeReplicationManager(fakeEtcd, nil)
var testControllerSpec api.ReplicationController var testControllerSpec api.ReplicationController
receivedCount := 0 received := make(chan bool)
manager.syncHandler = func(controllerSpec api.ReplicationController) error { manager.syncHandler = func(controllerSpec api.ReplicationController) error {
if !reflect.DeepEqual(controllerSpec, testControllerSpec) { if !reflect.DeepEqual(controllerSpec, testControllerSpec) {
t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec)
} }
receivedCount++ close(received)
return nil return nil
} }
go manager.watchControllers() go manager.watchControllers()
time.Sleep(10 * time.Millisecond)
fakeEtcd.WaitToWatch()
// Test normal case // Test normal case
testControllerSpec.ID = "foo" testControllerSpec.ID = "foo"
@ -456,14 +457,14 @@ func TestWatchControllers(t *testing.T) {
}, },
} }
time.Sleep(10 * time.Millisecond) select {
if receivedCount != 1 { case <-received:
t.Errorf("Expected 1 call but got %v", receivedCount) case <-time.After(10 * time.Millisecond):
t.Errorf("Expected 1 call but got 0")
} }
// Test error case // Test error case
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")
time.Sleep(10 * time.Millisecond)
// Did everything shut down? // Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open { if _, open := <-fakeEtcd.WatchResponse; open {
@ -472,9 +473,8 @@ func TestWatchControllers(t *testing.T) {
// Test purposeful shutdown // Test purposeful shutdown
go manager.watchControllers() go manager.watchControllers()
time.Sleep(10 * time.Millisecond) fakeEtcd.WaitToWatch()
fakeEtcd.WatchStop <- true fakeEtcd.WatchStop <- true
time.Sleep(10 * time.Millisecond)
// Did everything shut down? // Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open { if _, open := <-fakeEtcd.WatchResponse; open {

View File

@ -18,6 +18,7 @@ package util
import ( import (
"fmt" "fmt"
"sync"
"testing" "testing"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
@ -29,6 +30,11 @@ type EtcdResponseWithError struct {
} }
type FakeEtcdClient struct { type FakeEtcdClient struct {
// chanLock guards watchChanReady.
chanLock sync.Mutex
// watchChanReady is readable when all public channels are ready
watchChanReady chan bool
Data map[string]EtcdResponseWithError Data map[string]EtcdResponseWithError
DeletedKeys []string DeletedKeys []string
Err error Err error
@ -45,8 +51,9 @@ type FakeEtcdClient struct {
func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
return &FakeEtcdClient{ return &FakeEtcdClient{
t: t, t: t,
Data: map[string]EtcdResponseWithError{}, Data: map[string]EtcdResponseWithError{},
watchChanReady: make(chan bool),
} }
} }
@ -97,16 +104,37 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
return &etcd.Response{}, f.Err return &etcd.Response{}, f.Err
} }
func (f *FakeEtcdClient) WaitToWatch() {
f.chanLock.Lock()
defer f.chanLock.Unlock()
<-f.watchChanReady
}
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
f.WatchResponse = receiver f.WatchResponse = receiver
f.WatchStop = stop f.WatchStop = stop
injectedError := make(chan error) injectedError := make(chan error)
defer close(injectedError) defer close(injectedError)
f.WatchInjectError = injectedError f.WatchInjectError = injectedError
// close the channel indicating all channels of the fake client are
// ready.
close(f.watchChanReady)
defer func() {
f.chanLock.Lock()
defer f.chanLock.Unlock()
// A stop will make f.WatchStop close, which in turn makes it un-write-able.
// Need to have another call on Watch() to make it ready.
f.watchChanReady = make(chan bool)
}()
select { select {
case <-stop: case <-stop:
return nil, etcd.ErrWatchStoppedByUser return nil, etcd.ErrWatchStoppedByUser
case err := <-injectedError: case err := <-injectedError:
f.watchChanReady = make(chan bool)
return nil, err return nil, err
} }
// Never get here. // Never get here.