Merge pull request #309 from monnand/race-controller

fix data races in controller
This commit is contained in:
Daniel Smith 2014-07-01 11:34:05 -07:00
commit 99f85e3897
2 changed files with 38 additions and 10 deletions

View File

@ -435,17 +435,18 @@ func TestWatchControllers(t *testing.T) {
fakeEtcd := util.MakeFakeEtcdClient(t) fakeEtcd := util.MakeFakeEtcdClient(t)
manager := MakeReplicationManager(fakeEtcd, nil) manager := MakeReplicationManager(fakeEtcd, nil)
var testControllerSpec api.ReplicationController var testControllerSpec api.ReplicationController
receivedCount := 0 received := make(chan bool)
manager.syncHandler = func(controllerSpec api.ReplicationController) error { manager.syncHandler = func(controllerSpec api.ReplicationController) error {
if !reflect.DeepEqual(controllerSpec, testControllerSpec) { if !reflect.DeepEqual(controllerSpec, testControllerSpec) {
t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec)
} }
receivedCount++ close(received)
return nil return nil
} }
go manager.watchControllers() go manager.watchControllers()
time.Sleep(10 * time.Millisecond)
fakeEtcd.WaitForWatchCompletion()
// Test normal case // Test normal case
testControllerSpec.ID = "foo" testControllerSpec.ID = "foo"
@ -456,14 +457,14 @@ func TestWatchControllers(t *testing.T) {
}, },
} }
time.Sleep(10 * time.Millisecond) select {
if receivedCount != 1 { case <-received:
t.Errorf("Expected 1 call but got %v", receivedCount) case <-time.After(10 * time.Millisecond):
t.Errorf("Expected 1 call but got 0")
} }
// Test error case // Test error case
fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error") fakeEtcd.WatchInjectError <- fmt.Errorf("Injected error")
time.Sleep(10 * time.Millisecond)
// Did everything shut down? // Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open { if _, open := <-fakeEtcd.WatchResponse; open {
@ -472,9 +473,8 @@ func TestWatchControllers(t *testing.T) {
// Test purposeful shutdown // Test purposeful shutdown
go manager.watchControllers() go manager.watchControllers()
time.Sleep(10 * time.Millisecond) fakeEtcd.WaitForWatchCompletion()
fakeEtcd.WatchStop <- true fakeEtcd.WatchStop <- true
time.Sleep(10 * time.Millisecond)
// Did everything shut down? // Did everything shut down?
if _, open := <-fakeEtcd.WatchResponse; open { if _, open := <-fakeEtcd.WatchResponse; open {

View File

@ -18,6 +18,7 @@ package util
import ( import (
"fmt" "fmt"
"sync"
"testing" "testing"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
@ -29,6 +30,9 @@ type EtcdResponseWithError struct {
} }
type FakeEtcdClient struct { type FakeEtcdClient struct {
condWatchCompleted *sync.Cond
condLock sync.Mutex
Data map[string]EtcdResponseWithError Data map[string]EtcdResponseWithError
DeletedKeys []string DeletedKeys []string
Err error Err error
@ -44,10 +48,24 @@ type FakeEtcdClient struct {
} }
func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
return &FakeEtcdClient{ ret := &FakeEtcdClient{
t: t, t: t,
Data: map[string]EtcdResponseWithError{}, Data: map[string]EtcdResponseWithError{},
} }
// There are three publicly accessible channels in FakeEtcdClient:
// - WatchResponse
// - WatchInjectError
// - WatchStop
// They are only available when Watch() is called. If users of
// FakeEtcdClient want to use any of these channels, they have to call
// WaitForWatchCompletion before any operation on these channels.
// Internally, FakeEtcdClient use condWatchCompleted to indicate if the
// Watch() method has been called. WaitForWatchCompletion() will wait
// on condWatchCompleted. By the end of the Watch() method, it will
// call Broadcast() on condWatchCompleted, which will awake any
// goroutine waiting on this condition.
ret.condWatchCompleted = sync.NewCond(&ret.condLock)
return ret
} }
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
@ -97,12 +115,22 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err
return &etcd.Response{}, f.Err return &etcd.Response{}, f.Err
} }
func (f *FakeEtcdClient) WaitForWatchCompletion() {
f.condLock.Lock()
defer f.condLock.Unlock()
f.condWatchCompleted.Wait()
}
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
f.WatchResponse = receiver f.WatchResponse = receiver
f.WatchStop = stop f.WatchStop = stop
injectedError := make(chan error) injectedError := make(chan error)
defer close(injectedError) defer close(injectedError)
f.WatchInjectError = injectedError f.WatchInjectError = injectedError
f.condWatchCompleted.Broadcast()
select { select {
case <-stop: case <-stop:
return nil, etcd.ErrWatchStoppedByUser return nil, etcd.ErrWatchStoppedByUser