From a6144f656c864e0cbea4ecf2005d885d06d6677e Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 30 Jun 2014 12:00:14 -0700 Subject: [PATCH 1/2] Move etcd helpers to tools package so they can depend on api package. Add ResourceVersion, hook it up to etcd index to get atomic PUTs. --- pkg/api/helper.go | 7 ++ pkg/api/types.go | 1 + pkg/controller/replication_controller.go | 7 +- pkg/controller/replication_controller_test.go | 7 +- pkg/kubelet/kubelet.go | 5 +- pkg/kubelet/kubelet_test.go | 13 +-- pkg/registry/etcd_registry.go | 10 +- pkg/registry/etcd_registry_test.go | 93 ++++++++++--------- pkg/tools/doc.go | 18 ++++ pkg/{util => tools}/etcd_tools.go | 19 +++- pkg/{util => tools}/etcd_tools_test.go | 7 +- pkg/{util => tools}/fake_etcd_client.go | 2 +- pkg/util/doc.go | 3 +- 13 files changed, 118 insertions(+), 74 deletions(-) create mode 100644 pkg/tools/doc.go rename pkg/{util => tools}/etcd_tools.go (92%) rename pkg/{util => tools}/etcd_tools_test.go (95%) rename pkg/{util => tools}/fake_etcd_client.go (99%) diff --git a/pkg/api/helper.go b/pkg/api/helper.go index cd882110a5e..4c92fd1944f 100644 --- a/pkg/api/helper.go +++ b/pkg/api/helper.go @@ -49,6 +49,13 @@ func AddKnownTypes(types ...interface{}) { } } +// Takes an arbitary api type, returns pointer to its JSONBase field. +// obj must be a pointer to an api type. +func FindJSONBase(obj interface{}) (*JSONBase, error) { + _, jsonBase, err := nameAndJSONBase(obj) + return jsonBase, err +} + // Encode turns the given api object into an appropriate JSON string. // Will return an error if the object doesn't have an embedded JSONBase. // Obj may be a pointer to a struct, or a struct. If a struct, a copy diff --git a/pkg/api/types.go b/pkg/api/types.go index 8e67b20fab3..5104d60f06d 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -150,6 +150,7 @@ type JSONBase struct { ID string `json:"id,omitempty" yaml:"id,omitempty"` CreationTimestamp string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` SelfLink string `json:"selfLink,omitempty" yaml:"selfLink,omitempty"` + ResourceVersion uint64 `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty"` } type PodStatus string diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 6847a63cbbd..90a366797d0 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -25,6 +25,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" @@ -34,7 +35,7 @@ import ( // with actual running pods. // TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface type ReplicationManager struct { - etcdClient util.EtcdClient + etcdClient tools.EtcdClient kubeClient client.ClientInterface podControl PodControlInterface syncTime <-chan time.Time @@ -76,7 +77,7 @@ func (r RealPodControl) deletePod(podID string) error { return r.kubeClient.DeletePod(podID) } -func MakeReplicationManager(etcdClient util.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager { +func MakeReplicationManager(etcdClient tools.EtcdClient, kubeClient client.ClientInterface) *ReplicationManager { rm := &ReplicationManager{ kubeClient: kubeClient, etcdClient: etcdClient, @@ -201,7 +202,7 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli func (rm *ReplicationManager) synchronize() { var controllerSpecs []api.ReplicationController - helper := util.EtcdHelper{rm.etcdClient} + helper := tools.EtcdHelper{rm.etcdClient} err := helper.ExtractList("/registry/controllers", &controllerSpecs) if err != nil { glog.Errorf("Synchronization error: %v (%#v)", err, err) diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index e83cf695eea..bda55a82bcb 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) @@ -377,8 +378,8 @@ func TestSyncronize(t *testing.T) { }, } - fakeEtcd := util.MakeFakeEtcdClient(t) - fakeEtcd.Data["/registry/controllers"] = util.EtcdResponseWithError{ + fakeEtcd := tools.MakeFakeEtcdClient(t) + fakeEtcd.Data["/registry/controllers"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ @@ -432,7 +433,7 @@ func (a *asyncTimeout) done() { func TestWatchControllers(t *testing.T) { defer beginTimeout(20 * time.Second).done() - fakeEtcd := util.MakeFakeEtcdClient(t) + fakeEtcd := tools.MakeFakeEtcdClient(t) manager := MakeReplicationManager(fakeEtcd, nil) var testControllerSpec api.ReplicationController received := make(chan bool) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7b1efcf4fca..71a99a88868 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -34,6 +34,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" @@ -81,7 +82,7 @@ func New() *Kubelet { // The main kubelet implementation type Kubelet struct { Hostname string - EtcdClient util.EtcdClient + EtcdClient tools.EtcdClient DockerClient DockerInterface DockerPuller DockerPuller CadvisorClient CadvisorInterface @@ -520,7 +521,7 @@ func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.Container func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error { response, err := kl.EtcdClient.Get(key, true, false) if err != nil { - if util.IsEtcdNotFound(err) { + if tools.IsEtcdNotFound(err) { return nil } glog.Errorf("Error on etcd get of %s: %v", key, err) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 9c0ea52db04..d9fe97a5884 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" "github.com/fsouza/go-dockerclient" @@ -75,8 +76,8 @@ func verifyError(t *testing.T, e error) { } } -func makeTestKubelet(t *testing.T) (*Kubelet, *util.FakeEtcdClient, *FakeDockerClient) { - fakeEtcdClient := util.MakeFakeEtcdClient(t) +func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDockerClient) { + fakeEtcdClient := tools.MakeFakeEtcdClient(t) fakeDocker := &FakeDockerClient{ err: nil, } @@ -279,7 +280,7 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) { kubelet, fakeClient, _ := makeTestKubelet(t) channel := make(chan manifestUpdate) reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: nil, } @@ -298,7 +299,7 @@ func TestGetKubeletStateFromEtcd(t *testing.T) { kubelet, fakeClient, _ := makeTestKubelet(t) channel := make(chan manifestUpdate) reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: util.MakeJSONString([]api.Container{}), @@ -319,7 +320,7 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) { kubelet, fakeClient, _ := makeTestKubelet(t) channel := make(chan manifestUpdate) reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ ErrorCode: 100, @@ -338,7 +339,7 @@ func TestGetKubeletStateFromEtcdError(t *testing.T) { kubelet, fakeClient, _ := makeTestKubelet(t) channel := make(chan manifestUpdate) reader := startReading(channel) - fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ ErrorCode: 200, // non not found error diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index 9f8c95aa82e..1f171e0d240 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -21,7 +21,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/golang/glog" ) @@ -30,7 +30,7 @@ import ( // EtcdRegistry is an implementation of both ControllerRegistry and PodRegistry which is backed with etcd. type EtcdRegistry struct { - etcdClient util.EtcdClient + etcdClient tools.EtcdClient machines MinionRegistry manifestFactory ManifestFactory } @@ -39,7 +39,7 @@ type EtcdRegistry struct { // 'client' is the connection to etcd // 'machines' is the list of machines // 'scheduler' is the scheduling algorithm to use. -func MakeEtcdRegistry(client util.EtcdClient, machines MinionRegistry) *EtcdRegistry { +func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry { registry := &EtcdRegistry{ etcdClient: client, machines: machines, @@ -54,8 +54,8 @@ func makePodKey(machine, podID string) string { return "/registry/hosts/" + machine + "/pods/" + podID } -func (registry *EtcdRegistry) helper() *util.EtcdHelper { - return &util.EtcdHelper{registry.etcdClient} +func (registry *EtcdRegistry) helper() *tools.EtcdHelper { + return &tools.EtcdHelper{registry.etcdClient} } func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { diff --git a/pkg/registry/etcd_registry_test.go b/pkg/registry/etcd_registry_test.go index 1ea19adf51f..caa5c08bce1 100644 --- a/pkg/registry/etcd_registry_test.go +++ b/pkg/registry/etcd_registry_test.go @@ -23,11 +23,12 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) -func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegistry { +func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *EtcdRegistry { registry := MakeEtcdRegistry(client, MakeMinionRegistry(machines)) registry.manifestFactory = &BasicManifestFactory{ serviceRegistry: &MockServiceRegistry{}, @@ -36,7 +37,7 @@ func MakeTestEtcdRegistry(client util.EtcdClient, machines []string) *EtcdRegist } func TestEtcdGetPod(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) pod, err := registry.GetPod("foo") @@ -47,8 +48,8 @@ func TestEtcdGetPod(t *testing.T) { } func TestEtcdGetPodNotFound(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -64,8 +65,8 @@ func TestEtcdGetPodNotFound(t *testing.T) { } func TestEtcdCreatePod(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -106,8 +107,8 @@ func TestEtcdCreatePod(t *testing.T) { } func TestEtcdCreatePodAlreadyExisting(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), @@ -127,14 +128,14 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { } func TestEtcdCreatePodWithContainersError(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, E: &etcd.EtcdError{ErrorCode: 100}, } - fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -159,14 +160,14 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { } func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, E: &etcd.EtcdError{ErrorCode: 100}, } - fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{ + fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -207,8 +208,8 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { } func TestEtcdCreatePodWithExistingContainers(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -254,7 +255,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { } func TestEtcdDeletePod(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods/foo" fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ @@ -277,7 +278,7 @@ func TestEtcdDeletePod(t *testing.T) { } func TestEtcdDeletePodMultipleContainers(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods/foo" fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ @@ -305,9 +306,9 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { } func TestEtcdEmptyListPods(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods" - fakeClient.Data[key] = util.EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{}, @@ -324,9 +325,9 @@ func TestEtcdEmptyListPods(t *testing.T) { } func TestEtcdListPodsNotFound(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods" - fakeClient.Data[key] = util.EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ErrorCode: 100}, } @@ -339,9 +340,9 @@ func TestEtcdListPodsNotFound(t *testing.T) { } func TestEtcdListPods(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods" - fakeClient.Data[key] = util.EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ @@ -369,9 +370,9 @@ func TestEtcdListPods(t *testing.T) { } func TestEtcdListControllersNotFound(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/controllers" - fakeClient.Data[key] = util.EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ErrorCode: 100}, } @@ -384,9 +385,9 @@ func TestEtcdListControllersNotFound(t *testing.T) { } func TestEtcdListServicesNotFound(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/services/specs" - fakeClient.Data[key] = util.EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: &etcd.EtcdError{ErrorCode: 100}, } @@ -399,9 +400,9 @@ func TestEtcdListServicesNotFound(t *testing.T) { } func TestEtcdListControllers(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/controllers" - fakeClient.Data[key] = util.EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ @@ -425,7 +426,7 @@ func TestEtcdListControllers(t *testing.T) { } func TestEtcdGetController(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) ctrl, err := registry.GetController("foo") @@ -436,8 +437,8 @@ func TestEtcdGetController(t *testing.T) { } func TestEtcdGetControllerNotFound(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/controllers/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/controllers/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -456,7 +457,7 @@ func TestEtcdGetControllerNotFound(t *testing.T) { } func TestEtcdDeleteController(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.DeleteController("foo") expectNoError(t, err) @@ -470,7 +471,7 @@ func TestEtcdDeleteController(t *testing.T) { } func TestEtcdCreateController(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.CreateController(api.ReplicationController{ JSONBase: api.JSONBase{ @@ -489,7 +490,7 @@ func TestEtcdCreateController(t *testing.T) { } func TestEtcdUpdateController(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.UpdateController(api.ReplicationController{ @@ -506,9 +507,9 @@ func TestEtcdUpdateController(t *testing.T) { } func TestEtcdListServices(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/services/specs" - fakeClient.Data[key] = util.EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ @@ -532,8 +533,8 @@ func TestEtcdListServices(t *testing.T) { } func TestEtcdCreateService(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -555,7 +556,7 @@ func TestEtcdCreateService(t *testing.T) { } func TestEtcdGetService(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) service, err := registry.GetService("foo") @@ -566,8 +567,8 @@ func TestEtcdGetService(t *testing.T) { } func TestEtcdGetServiceNotFound(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{ + fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.Data["/registry/services/specs/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -583,7 +584,7 @@ func TestEtcdGetServiceNotFound(t *testing.T) { } func TestEtcdDeleteService(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.DeleteService("foo") expectNoError(t, err) @@ -601,7 +602,7 @@ func TestEtcdDeleteService(t *testing.T) { } func TestEtcdUpdateService(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) testService := api.Service{ @@ -623,7 +624,7 @@ func TestEtcdUpdateService(t *testing.T) { } func TestEtcdUpdateEndpoints(t *testing.T) { - fakeClient := util.MakeFakeEtcdClient(t) + fakeClient := tools.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) endpoints := api.Endpoints{ Name: "foo", diff --git a/pkg/tools/doc.go b/pkg/tools/doc.go new file mode 100644 index 00000000000..d1d8014cda2 --- /dev/null +++ b/pkg/tools/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package tools implements general tools which depend on the api package. +package tools diff --git a/pkg/util/etcd_tools.go b/pkg/tools/etcd_tools.go similarity index 92% rename from pkg/util/etcd_tools.go rename to pkg/tools/etcd_tools.go index 39f4b37a3f5..631c925d0b4 100644 --- a/pkg/util/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -14,13 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package tools import ( "encoding/json" "fmt" "reflect" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/coreos/go-etcd/etcd" ) @@ -117,8 +118,14 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { - _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) - return err + _, index, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) + if err != nil { + return err + } + if jsonBase, err := api.FindJSONBase(objPtr); err == nil { + jsonBase.ResourceVersion = index + } + return nil } func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { @@ -147,7 +154,11 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error { if err != nil { return err } - _, err = h.Client.Set(key, string(data), 0) + if jsonBase, err := api.FindJSONBase(obj); err == nil && jsonBase.ResourceVersion != 0 { + _, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion) + } else { + _, err = h.Client.Set(key, string(data), 0) + } return err } diff --git a/pkg/util/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go similarity index 95% rename from pkg/util/etcd_tools_test.go rename to pkg/tools/etcd_tools_test.go index 30c4c6243b9..3b382877487 100644 --- a/pkg/util/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -14,13 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package tools import ( "fmt" "reflect" "testing" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) @@ -83,7 +84,7 @@ func TestExtractList(t *testing.T) { func TestExtractObj(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) expect := testMarshalType{ID: "foo"} - fakeClient.Set("/some/key", MakeJSONString(expect), 0) + fakeClient.Set("/some/key", util.MakeJSONString(expect), 0) helper := EtcdHelper{fakeClient} var got testMarshalType err := helper.ExtractObj("/some/key", &got, false) @@ -143,7 +144,7 @@ func TestSetObj(t *testing.T) { if err != nil { t.Errorf("Unexpected error %#v", err) } - expect := MakeJSONString(obj) + expect := util.MakeJSONString(obj) got := fakeClient.Data["/some/key"].R.Node.Value if expect != got { t.Errorf("Wanted %v, got %v", expect, got) diff --git a/pkg/util/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go similarity index 99% rename from pkg/util/fake_etcd_client.go rename to pkg/tools/fake_etcd_client.go index 36a00752ad8..47509bdff8a 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package tools import ( "fmt" diff --git a/pkg/util/doc.go b/pkg/util/doc.go index 316eedb31df..dac5f137bf0 100644 --- a/pkg/util/doc.go +++ b/pkg/util/doc.go @@ -15,5 +15,6 @@ limitations under the License. */ // Package util implements various utility functions used in both testing and implementation -// of Kubernetes +// of Kubernetes. Package util may not depend on any other package in the Kubernetes +// package tree. package util From 3b9735d787523b3f3d70bd50e635dac36c2d0e43 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 2 Jul 2014 13:51:27 -0700 Subject: [PATCH 2/2] Test atomic PUTs, and make them work. Improve apiserver/logger.go's interface (it's pretty cool now). Improve apiserver's error reporting to clients. Improve client's handling of errors from apiserver. Make failed PUTs return 409 (conflict)-- http status codes are amazingly well defined for what we're doing! --- cmd/integration/integration.go | 86 +++++++++++++++++++++++++++++++++- pkg/api/helper.go | 17 +++++++ pkg/api/types.go | 6 ++- pkg/apiserver/apiserver.go | 36 +++++++++++--- pkg/apiserver/logger.go | 68 +++++++++++++++++++++++---- pkg/client/client.go | 23 ++++++--- pkg/kubelet/kubelet_server.go | 4 +- pkg/tools/etcd_tools.go | 30 +++++++----- 8 files changed, 229 insertions(+), 41 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index e6e9f183bed..12239f59a80 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -23,7 +23,9 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "reflect" "runtime" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -128,8 +130,73 @@ func runReplicationControllerTest(kubeClient *client.Client) { glog.Infof("Replication controller produced:\n\n%#v\n\n", pods) } +func runAtomicPutTest(c *client.Client) { + var svc api.Service + err := c.Post().Path("services").Body( + api.Service{ + JSONBase: api.JSONBase{ID: "atomicService"}, + Port: 12345, + Labels: map[string]string{ + "name": "atomicService", + }, + }, + ).Do().Into(&svc) + if err != nil { + glog.Fatalf("Failed creating atomicService: %v", err) + } + + testLabels := labels.Set{} + for i := 0; i < 26; i++ { + // a: z, b: y, etc... + testLabels[string([]byte{byte('a' + i)})] = string([]byte{byte('z' - i)}) + } + var wg sync.WaitGroup + wg.Add(len(testLabels)) + for label, value := range testLabels { + go func(l, v string) { + for { + var tmpSvc api.Service + err := c.Get().Path("services").Path(svc.ID).Do().Into(&tmpSvc) + if err != nil { + glog.Errorf("Error getting atomicService: %v", err) + continue + } + if tmpSvc.Selector == nil { + tmpSvc.Selector = map[string]string{l: v} + } else { + tmpSvc.Selector[l] = v + } + err = c.Put().Path("services").Path(svc.ID).Body(&tmpSvc).Do().Error() + if err != nil { + if se, ok := err.(*client.StatusErr); ok { + if se.Status.Code == http.StatusConflict { + // This is what we expect. + continue + } + } + glog.Errorf("Unexpected error putting atomicService: %v", err) + continue + } + break + } + wg.Done() + }(label, value) + } + wg.Wait() + err = c.Get().Path("services").Path(svc.ID).Do().Into(&svc) + if err != nil { + glog.Fatalf("Failed getting atomicService after writers are complete: %v", err) + } + if !reflect.DeepEqual(testLabels, labels.Set(svc.Selector)) { + glog.Fatalf("Selector PUTs were not atomic: wanted %v, got %v", testLabels, svc.Selector) + } + glog.Info("Atomic PUTs work.") +} + +type testFunc func(*client.Client) + func main() { - runtime.GOMAXPROCS(4) + runtime.GOMAXPROCS(runtime.NumCPU()) util.ReallyCrash = true util.InitLogs() defer util.FlushLogs() @@ -150,7 +217,22 @@ func main() { time.Sleep(time.Second * 10) kubeClient := client.New(apiServerURL, nil) - runReplicationControllerTest(kubeClient) + + // Run tests in parallel + testFuncs := []testFunc{ + runReplicationControllerTest, + runAtomicPutTest, + } + var wg sync.WaitGroup + wg.Add(len(testFuncs)) + for i := range testFuncs { + f := testFuncs[i] + go func() { + f(kubeClient) + wg.Done() + }() + } + wg.Wait() // Check that kubelet tried to make the pods. // Using a set to list unique creation attempts. Our fake is diff --git a/pkg/api/helper.go b/pkg/api/helper.go index 4c92fd1944f..2d031d83ddd 100644 --- a/pkg/api/helper.go +++ b/pkg/api/helper.go @@ -56,6 +56,23 @@ func FindJSONBase(obj interface{}) (*JSONBase, error) { return jsonBase, err } +// Takes an arbitary api type, return a copy of its JSONBase field. +// obj may be a pointer to an api type, or a non-pointer struct api type. +func FindJSONBaseRO(obj interface{}) (JSONBase, error) { + v := reflect.ValueOf(obj) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + if v.Kind() != reflect.Struct { + return JSONBase{}, fmt.Errorf("expected struct, but got %v", v.Type().Name()) + } + jsonBase := v.FieldByName("JSONBase") + if !jsonBase.IsValid() { + return JSONBase{}, fmt.Errorf("struct %v lacks embedded JSON type", v.Type().Name()) + } + return jsonBase.Interface().(JSONBase), nil +} + // Encode turns the given api object into an appropriate JSON string. // Will return an error if the object doesn't have an embedded JSONBase. // Obj may be a pointer to a struct, or a struct. If a struct, a copy diff --git a/pkg/api/types.go b/pkg/api/types.go index 5104d60f06d..4627481e8ac 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -267,8 +267,12 @@ type Status struct { // One of: "success", "failure", "working" (for operations not yet completed) // TODO: if "working", include an operation identifier so final status can be // checked. - Status string `json:"status,omitempty" yaml:"status,omitempty"` + Status string `json:"status,omitempty" yaml:"status,omitempty"` + // Details about the status. May be an error description or an + // operation number for later polling. Details string `json:"details,omitempty" yaml:"details,omitempty"` + // Suggested HTTP return code for this status, 0 if not set. + Code int `json:"code,omitempty" yaml:"code,omitempty"` } // Values of Status.Status diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 92cfe11c726..64e43b95c1b 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) @@ -54,9 +55,15 @@ func MakeAsync(fn WorkFunc) <-chan interface{} { defer util.HandleCrash() obj, err := fn() if err != nil { + status := http.StatusInternalServerError + switch { + case tools.IsEtcdConflict(err): + status = http.StatusConflict + } channel <- &api.Status{ Status: api.StatusFailure, Details: err.Error(), + Code: status, } } else { channel <- obj @@ -110,9 +117,13 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { glog.Infof("ApiServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack()) } }() - logger := MakeLogged(req, w) - w = logger - defer logger.Log() + defer MakeLogged(req, &w).StacktraceWhen( + StatusIsNot( + http.StatusOK, + http.StatusAccepted, + http.StatusConflict, + ), + ).Log() url, err := url.ParseRequestURI(req.RequestURI) if err != nil { server.error(err, w) @@ -141,7 +152,7 @@ func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } storage := server.storage[requestParts[0]] if storage == nil { - logger.Addf("'%v' has no storage object", requestParts[0]) + LogOf(w).Addf("'%v' has no storage object", requestParts[0]) server.notFound(req, w) return } else { @@ -171,8 +182,7 @@ func (server *ApiServer) error(err error, w http.ResponseWriter) { func (server *ApiServer) readBody(req *http.Request) ([]byte, error) { defer req.Body.Close() - body, err := ioutil.ReadAll(req.Body) - return body, err + return ioutil.ReadAll(req.Body) } // finishReq finishes up a request, waiting until the operation finishes or, after a timeout, creating an @@ -184,7 +194,19 @@ func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout ti } obj, complete := op.StatusOrResult() if complete { - server.write(http.StatusOK, obj, w) + status := http.StatusOK + switch stat := obj.(type) { + case api.Status: + LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.") + if stat.Code != 0 { + status = stat.Code + } + case *api.Status: + if stat.Code != 0 { + status = stat.Code + } + } + server.write(status, obj, w) } else { server.write(http.StatusAccepted, obj, w) } diff --git a/pkg/apiserver/logger.go b/pkg/apiserver/logger.go index da82773cb61..3d881dee58b 100644 --- a/pkg/apiserver/logger.go +++ b/pkg/apiserver/logger.go @@ -25,6 +25,9 @@ import ( "github.com/golang/glog" ) +// Return true if a stacktrace should be logged for this status +type StacktracePred func(httpStatus int) (logStacktrace bool) + // Add a layer on top of ResponseWriter, so we can track latency and error // message sources. type respLogger struct { @@ -35,17 +38,64 @@ type respLogger struct { req *http.Request w http.ResponseWriter + + logStacktracePred StacktracePred } +func DefaultStacktracePred(status int) bool { + return status != http.StatusOK && status != http.StatusAccepted +} + +// MakeLogged turns a normal response writer into a logged response writer. +// // Usage: -// logger := MakeLogged(req, w) -// w = logger // Route response writing actions through w -// defer logger.Log() -func MakeLogged(req *http.Request, w http.ResponseWriter) *respLogger { - return &respLogger{ - startTime: time.Now(), - req: req, - w: w, +// +// defer MakeLogged(req, &w).StacktraceWhen(StatusIsNot(200, 202)).Log() +// +// (Only the call to Log() is defered, so you can set everything up in one line!) +// +// Note that this *changes* your writer, to route response writing actions +// through the logger. +// +// Use LogOf(w).Addf(...) to log something along with the response result. +func MakeLogged(req *http.Request, w *http.ResponseWriter) *respLogger { + rl := &respLogger{ + startTime: time.Now(), + req: req, + w: *w, + logStacktracePred: DefaultStacktracePred, + } + *w = rl // hijack caller's writer! + return rl +} + +// LogOf returns the logger hiding in w. Panics if there isn't such a logger, +// because MakeLogged() must have been previously called for the log to work. +func LogOf(w http.ResponseWriter) *respLogger { + if rl, ok := w.(*respLogger); ok { + return rl + } + panic("Logger not installed yet!") + return nil +} + +// Sets the stacktrace logging predicate, which decides when to log a stacktrace. +// There's a default, so you don't need to call this unless you don't like the default. +func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger { + rl.logStacktracePred = pred + return rl +} + +// StatusIsNot returns a StacktracePred which will cause stacktraces to be logged +// for any status *not* in the given list. +func StatusIsNot(statuses ...int) StacktracePred { + return func(status int) bool { + for _, s := range statuses { + if status == s { + return false + } + } + return true } } @@ -73,7 +123,7 @@ func (rl *respLogger) Write(b []byte) (int, error) { // Implement http.ResponseWriter func (rl *respLogger) WriteHeader(status int) { rl.status = status - if status != http.StatusOK && status != http.StatusAccepted { + if rl.logStacktracePred(status) { // Only log stacks for errors stack := make([]byte, 2048) stack = stack[:runtime.Stack(stack, false)] diff --git a/pkg/client/client.go b/pkg/client/client.go index 6ee978abc44..effccae0562 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -100,16 +100,27 @@ func (c *Client) doRequest(request *http.Request) ([]byte, error) { if err != nil { return body, err } - if response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent { + + // Did the server give us a status response? + isStatusResponse := false + var status api.Status + if err := api.DecodeInto(body, &status); err == nil && status.Status != "" { + isStatusResponse = true + } + + switch { + case response.StatusCode == http.StatusConflict: + // Return error given by server, if there was one. + if isStatusResponse { + return nil, &StatusErr{status} + } + fallthrough + case response.StatusCode < http.StatusOK || response.StatusCode > http.StatusPartialContent: return nil, fmt.Errorf("request [%#v] failed (%d) %s: %s", request, response.StatusCode, response.Status, string(body)) } // If the server gave us a status back, look at what it was. - var status api.Status - if err := api.DecodeInto(body, &status); err == nil && status.Status != "" { - if status.Status == api.StatusSuccess { - return body, nil - } + if isStatusResponse && status.Status != api.StatusSuccess { // "Working" requests need to be handled specially. // "Failed" requests are clearly just an error and it makes sense to return them as such. return nil, &StatusErr{status} diff --git a/pkg/kubelet/kubelet_server.go b/pkg/kubelet/kubelet_server.go index 0883dc61454..41bc6d8bd42 100644 --- a/pkg/kubelet/kubelet_server.go +++ b/pkg/kubelet/kubelet_server.go @@ -46,9 +46,7 @@ func (s *KubeletServer) error(w http.ResponseWriter, err error) { } func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - logger := apiserver.MakeLogged(req, w) - w = logger - defer logger.Log() + defer apiserver.MakeLogged(req, &w).Log() u, err := url.ParseRequestURI(req.RequestURI) if err != nil { diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 631c925d0b4..9f4b625e0d8 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -118,14 +118,8 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { - _, index, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) - if err != nil { - return err - } - if jsonBase, err := api.FindJSONBase(objPtr); err == nil { - jsonBase.ResourceVersion = index - } - return nil + _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) + return err } func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { @@ -145,20 +139,30 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response) } body = response.Node.Value - return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr) + err = json.Unmarshal([]byte(body), objPtr) + if jsonBase, err := api.FindJSONBase(objPtr); err == nil { + jsonBase.ResourceVersion = response.Node.ModifiedIndex + // Note that err shadows the err returned below, so we won't + // return an error just because we failed to find a JSONBase. + // This is intentional. + } + return body, response.Node.ModifiedIndex, err } -// SetObj marshals obj via json, and stores under key. +// SetObj marshals obj via json, and stores under key. Will do an +// atomic update if obj's ResourceVersion field is set. func (h *EtcdHelper) SetObj(key string, obj interface{}) error { data, err := json.Marshal(obj) if err != nil { return err } - if jsonBase, err := api.FindJSONBase(obj); err == nil && jsonBase.ResourceVersion != 0 { + if jsonBase, err := api.FindJSONBaseRO(obj); err == nil && jsonBase.ResourceVersion != 0 { _, err = h.Client.CompareAndSwap(key, string(data), 0, "", jsonBase.ResourceVersion) - } else { - _, err = h.Client.Set(key, string(data), 0) + return err // err is shadowed! } + + // TODO: when client supports atomic creation, integrate this with the above. + _, err = h.Client.Set(key, string(data), 0) return err }