From b3e5f40988124cc4a1c03e91cd96f8d685d4d854 Mon Sep 17 00:00:00 2001 From: Kouhei Ueno Date: Mon, 4 Aug 2014 00:29:59 +0900 Subject: [PATCH 1/4] Fix bug where debug "return" prevented whole TestAtomicUpdate run --- pkg/tools/etcd_tools_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 58c19bb530c..7d52889a886 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -243,7 +243,6 @@ func TestAtomicUpdate(t *testing.T) { if expect != got { t.Errorf("Wanted %v, got %v", expect, got) } - return // Update an existing node. callbackCalled := false From 3f9ec452e48633a404c7c72ffded07f6255b7ace Mon Sep 17 00:00:00 2001 From: Kouhei Ueno Date: Wed, 30 Jul 2014 19:05:10 +0900 Subject: [PATCH 2/4] use atomic create in EtcdHelper.AtomicUpdate --- pkg/tools/etcd_tools.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 101b43586d3..ccea211b726 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -92,6 +92,11 @@ func IsEtcdTestFailed(err error) bool { return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) } +// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error. +func IsEtcdNodeExist(err error) bool { + return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) +} + // IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. func IsEtcdWatchStoppedByUser(err error) bool { return etcd.ErrWatchStoppedByUser == err @@ -232,15 +237,20 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E return err } - // First time this key has been used, just set. - if index == 0 { - return h.SetObj(key, ret) - } - data, err := h.Encoding.Encode(ret) if err != nil { return err } + + // First time this key has been used, try creating new value. + if index == 0 { + _, err = h.Client.Create(key, string(data), 0) + if IsEtcdNodeExist(err) { + continue + } + return err + } + _, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index) if IsEtcdTestFailed(err) { continue From 4799b546c915a2f171d913379276529376af486b Mon Sep 17 00:00:00 2001 From: Kouhei Ueno Date: Mon, 4 Aug 2014 00:26:33 +0900 Subject: [PATCH 3/4] Make fake_etcd_client threadsafe --- pkg/tools/fake_etcd_client.go | 39 +++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index feac29b72f7..39a0c320783 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -19,6 +19,7 @@ package tools import ( "errors" "fmt" + "sync" "github.com/coreos/go-etcd/etcd" ) @@ -40,11 +41,12 @@ type FakeEtcdClient struct { Data map[string]EtcdResponseWithError DeletedKeys []string expectNotFoundGetSet map[string]struct{} - Err error - t TestLogger - Ix int - TestIndex bool - ChangeIndex uint64 + sync.Mutex + Err error + t TestLogger + Ix int + TestIndex bool + ChangeIndex uint64 // Will become valid after Watch is called; tester may write to it. Tester may // also read from it to verify that it's closed after injecting an error. @@ -89,11 +91,17 @@ func (f *FakeEtcdClient) generateIndex() uint64 { } func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { + f.Mutex.Lock() + defer f.Mutex.Unlock() + f.Ix = f.Ix + 1 - return f.Set(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) + return f.setLocked(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) } func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) { + f.Mutex.Lock() + defer f.Mutex.Unlock() + result := f.Data[key] if result.R == nil { if _, ok := f.expectNotFoundGetSet[key]; !ok { @@ -110,7 +118,7 @@ func (f *FakeEtcdClient) nodeExists(key string) bool { return ok && result.R != nil && result.R.Node != nil } -func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { +func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Response, error) { if f.Err != nil { return nil, f.Err } @@ -146,6 +154,13 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err return result.R, nil } +func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { + f.Mutex.Lock() + defer f.Mutex.Unlock() + + return f.setLocked(key, value, ttl) +} + func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) { if f.Err != nil { return nil, f.Err @@ -160,6 +175,9 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue return nil, errors.New("Either prevValue or prevIndex must be specified.") } + f.Mutex.Lock() + defer f.Mutex.Unlock() + if !f.nodeExists(key) { return nil, EtcdErrorNotFound } @@ -174,15 +192,18 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue return nil, EtcdErrorTestFailed } - return f.Set(key, value, ttl) + return f.setLocked(key, value, ttl) } func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) { + f.Mutex.Lock() + defer f.Mutex.Unlock() + if f.nodeExists(key) { return nil, EtcdErrorNodeExist } - return f.Set(key, value, ttl) + return f.setLocked(key, value, ttl) } func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { From cf117d5b485cbd2470498bd7fa9f4560487a1b8c Mon Sep 17 00:00:00 2001 From: Kouhei Ueno Date: Mon, 4 Aug 2014 00:28:45 +0900 Subject: [PATCH 4/4] Add test for EtcdHelper.AtomicUpdate concurrent create --- pkg/tools/etcd_tools_test.go | 54 +++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 7d52889a886..443df016135 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -19,6 +19,7 @@ package tools import ( "fmt" "reflect" + "sync" "testing" "time" @@ -227,7 +228,7 @@ func TestAtomicUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") - obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} + obj := &TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { return obj, nil }) @@ -274,6 +275,57 @@ func TestAtomicUpdate(t *testing.T) { } } +func TestAtomicUpdate_CreateCollision(t *testing.T) { + fakeClient := MakeFakeEtcdClient(t) + fakeClient.TestIndex = true + encoding := scheme + helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} + + fakeClient.ExpectNotFoundGet("/some/key") + + const concurrency = 10 + var wgDone sync.WaitGroup + var wgForceCollision sync.WaitGroup + wgDone.Add(concurrency) + wgForceCollision.Add(concurrency) + + for i := 0; i < concurrency; i++ { + // Increment TestResource.Value by 1 + go func() { + defer wgDone.Done() + + firstCall := true + err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { + defer func() { firstCall = false }() + + if firstCall { + // Force collision by joining all concurrent AtomicUpdate operations here. + wgForceCollision.Done() + wgForceCollision.Wait() + } + + currValue := in.(*TestResource).Value + obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: currValue + 1} + return obj, nil + }) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + }() + } + wgDone.Wait() + + // Check that stored TestResource has received all updates. + body := fakeClient.Data["/some/key"].R.Node.Value + stored := &TestResource{} + if err := encoding.DecodeInto([]byte(body), stored); err != nil { + t.Errorf("Error decoding stored value: %v", body) + } + if stored.Value != concurrency { + t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value) + } +} + func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call")