diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 0fd04120970..2307c1f3476 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -88,16 +88,16 @@ func IsEtcdNotFound(err error) bool { return isEtcdErrorNum(err, EtcdErrorCodeNotFound) } -// Returns true iff err is an etcd key node exists error. -func IsEtcdNodeExist(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) -} - // IsEtcdTestFailed returns true iff err is an etcd write conflict. func IsEtcdTestFailed(err error) bool { return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) } +// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error. +func IsEtcdNodeExist(err error) bool { + return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) +} + // IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. func IsEtcdWatchStoppedByUser(err error) bool { return etcd.ErrWatchStoppedByUser == err @@ -253,15 +253,20 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E return err } - // First time this key has been used, just set. - if index == 0 { - return h.SetObj(key, ret) - } - data, err := h.Encoding.Encode(ret) if err != nil { return err } + + // First time this key has been used, try creating new value. + if index == 0 { + _, err = h.Client.Create(key, string(data), 0) + if IsEtcdNodeExist(err) { + continue + } + return err + } + _, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index) if IsEtcdTestFailed(err) { continue diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 6c79873ee54..877a821d46e 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -19,6 +19,7 @@ package tools import ( "fmt" "reflect" + "sync" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -226,7 +227,7 @@ func TestAtomicUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") - obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} + obj := &TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { return obj, nil }) @@ -242,7 +243,6 @@ func TestAtomicUpdate(t *testing.T) { if expect != got { t.Errorf("Wanted %v, got %v", expect, got) } - return // Update an existing node. callbackCalled := false @@ -274,6 +274,57 @@ func TestAtomicUpdate(t *testing.T) { } } +func TestAtomicUpdate_CreateCollision(t *testing.T) { + fakeClient := MakeFakeEtcdClient(t) + fakeClient.TestIndex = true + encoding := scheme + helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} + + fakeClient.ExpectNotFoundGet("/some/key") + + const concurrency = 10 + var wgDone sync.WaitGroup + var wgForceCollision sync.WaitGroup + wgDone.Add(concurrency) + wgForceCollision.Add(concurrency) + + for i := 0; i < concurrency; i++ { + // Increment TestResource.Value by 1 + go func() { + defer wgDone.Done() + + firstCall := true + err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { + defer func() { firstCall = false }() + + if firstCall { + // Force collision by joining all concurrent AtomicUpdate operations here. + wgForceCollision.Done() + wgForceCollision.Wait() + } + + currValue := in.(*TestResource).Value + obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: currValue + 1} + return obj, nil + }) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + }() + } + wgDone.Wait() + + // Check that stored TestResource has received all updates. + body := fakeClient.Data["/some/key"].R.Node.Value + stored := &TestResource{} + if err := encoding.DecodeInto([]byte(body), stored); err != nil { + t.Errorf("Error decoding stored value: %v", body) + } + if stored.Value != concurrency { + t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value) + } +} + func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index feac29b72f7..39a0c320783 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -19,6 +19,7 @@ package tools import ( "errors" "fmt" + "sync" "github.com/coreos/go-etcd/etcd" ) @@ -40,11 +41,12 @@ type FakeEtcdClient struct { Data map[string]EtcdResponseWithError DeletedKeys []string expectNotFoundGetSet map[string]struct{} - Err error - t TestLogger - Ix int - TestIndex bool - ChangeIndex uint64 + sync.Mutex + Err error + t TestLogger + Ix int + TestIndex bool + ChangeIndex uint64 // Will become valid after Watch is called; tester may write to it. Tester may // also read from it to verify that it's closed after injecting an error. @@ -89,11 +91,17 @@ func (f *FakeEtcdClient) generateIndex() uint64 { } func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { + f.Mutex.Lock() + defer f.Mutex.Unlock() + f.Ix = f.Ix + 1 - return f.Set(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) + return f.setLocked(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) } func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) { + f.Mutex.Lock() + defer f.Mutex.Unlock() + result := f.Data[key] if result.R == nil { if _, ok := f.expectNotFoundGetSet[key]; !ok { @@ -110,7 +118,7 @@ func (f *FakeEtcdClient) nodeExists(key string) bool { return ok && result.R != nil && result.R.Node != nil } -func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { +func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Response, error) { if f.Err != nil { return nil, f.Err } @@ -146,6 +154,13 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err return result.R, nil } +func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { + f.Mutex.Lock() + defer f.Mutex.Unlock() + + return f.setLocked(key, value, ttl) +} + func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) { if f.Err != nil { return nil, f.Err @@ -160,6 +175,9 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue return nil, errors.New("Either prevValue or prevIndex must be specified.") } + f.Mutex.Lock() + defer f.Mutex.Unlock() + if !f.nodeExists(key) { return nil, EtcdErrorNotFound } @@ -174,15 +192,18 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue return nil, EtcdErrorTestFailed } - return f.Set(key, value, ttl) + return f.setLocked(key, value, ttl) } func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) { + f.Mutex.Lock() + defer f.Mutex.Unlock() + if f.nodeExists(key) { return nil, EtcdErrorNodeExist } - return f.Set(key, value, ttl) + return f.setLocked(key, value, ttl) } func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {