From cf117d5b485cbd2470498bd7fa9f4560487a1b8c Mon Sep 17 00:00:00 2001 From: Kouhei Ueno Date: Mon, 4 Aug 2014 00:28:45 +0900 Subject: [PATCH] Add test for EtcdHelper.AtomicUpdate concurrent create --- pkg/tools/etcd_tools_test.go | 54 +++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 7d52889a886..443df016135 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -19,6 +19,7 @@ package tools import ( "fmt" "reflect" + "sync" "testing" "time" @@ -227,7 +228,7 @@ func TestAtomicUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") - obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} + obj := &TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { return obj, nil }) @@ -274,6 +275,57 @@ func TestAtomicUpdate(t *testing.T) { } } +func TestAtomicUpdate_CreateCollision(t *testing.T) { + fakeClient := MakeFakeEtcdClient(t) + fakeClient.TestIndex = true + encoding := scheme + helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} + + fakeClient.ExpectNotFoundGet("/some/key") + + const concurrency = 10 + var wgDone sync.WaitGroup + var wgForceCollision sync.WaitGroup + wgDone.Add(concurrency) + wgForceCollision.Add(concurrency) + + for i := 0; i < concurrency; i++ { + // Increment TestResource.Value by 1 + go func() { + defer wgDone.Done() + + firstCall := true + err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { + defer func() { firstCall = false }() + + if firstCall { + // Force collision by joining all concurrent AtomicUpdate operations here. + wgForceCollision.Done() + wgForceCollision.Wait() + } + + currValue := in.(*TestResource).Value + obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: currValue + 1} + return obj, nil + }) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + }() + } + wgDone.Wait() + + // Check that stored TestResource has received all updates. + body := fakeClient.Data["/some/key"].R.Node.Value + stored := &TestResource{} + if err := encoding.DecodeInto([]byte(body), stored); err != nil { + t.Errorf("Error decoding stored value: %v", body) + } + if stored.Value != concurrency { + t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value) + } +} + func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call")