diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index 0372aa3da2f..b9cee2cc3b8 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -17,7 +17,6 @@ limitations under the License. package registry import ( - "encoding/json" "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -90,56 +89,35 @@ func makeContainerKey(machine string) string { return "/registry/hosts/" + machine + "/kubelet" } -func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) { - index, err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true) - return manifests, index, err -} - -func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest, index uint64) error { - if index != 0 { - return registry.helper().CompareAndSwapObj(makeContainerKey(machine), manifests, index) - } else { - return registry.helper().SetObj(makeContainerKey(machine), manifests) - } -} - func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { podOut, machine, err := registry.findPod(pod.ID) if err == nil { + // TODO: this error message looks racy. return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut) } return registry.runPod(pod, machineIn) } func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { - manifests, index, err := registry.loadManifests(machine) - if err != nil { - return err - } - - key := makePodKey(machine, pod.ID) - data, err := json.Marshal(pod) - if err != nil { - return err - } - _, err = registry.etcdClient.Create(key, string(data), 0) + podKey := makePodKey(machine, pod.ID) + err := registry.helper().SetObj(podKey, pod) manifest, err := registry.manifestFactory.MakeManifest(machine, pod) if err != nil { return err } - for { - manifests = append(manifests, manifest) - err = registry.updateManifests(machine, manifests, index) - if util.IsEtcdConflict(err) { - manifests, index, err = registry.loadManifests(machine) - if err != nil { - return err - } - continue - } + + contKey := makeContainerKey(machine) + var manifests []api.ContainerManifest + err = registry.helper().AtomicUpdate(contKey, &manifests, func() (interface{}, error) { + return append(manifests, manifest), nil + }) + if err != nil { + // Don't strand stuff. + registry.etcdClient.Delete(podKey, false) return err } + return nil } func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error { @@ -155,12 +133,19 @@ func (registry *EtcdRegistry) DeletePod(podID string) error { } func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error { - for { - manifests, index, err := registry.loadManifests(machine) - if err != nil { - return err - } - newManifests := make([]api.ContainerManifest, 0) + // First delete the pod, so a scheduler doesn't notice it getting removed from the + // machine and attempt to put it somewhere. + podKey := makePodKey(machine, podID) + _, err := registry.etcdClient.Delete(podKey, true) + if err != nil { + return err + } + + // Next, remove the pod from the machine atomically. + contKey := makeContainerKey(machine) + var manifests []api.ContainerManifest + return registry.helper().AtomicUpdate(contKey, &manifests, func() (interface{}, error) { + newManifests := make([]api.ContainerManifest, 0, len(manifests)) found := false for _, manifest := range manifests { if manifest.Id != podID { @@ -175,22 +160,13 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error // However it is "deleted" so log it and move on glog.Infof("Couldn't find: %s in %#v", podID, manifests) } - if err = registry.updateManifests(machine, newManifests, index); err != nil { - if util.IsEtcdConflict(err) { - continue - } - return err - } - break - } - key := makePodKey(machine, podID) - _, err := registry.etcdClient.Delete(key, true) - return err + return newManifests, nil + }) } func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { key := makePodKey(machine, podID) - _, err = registry.helper().ExtractObj(key, &pod, false) + err = registry.helper().ExtractObj(key, &pod, false) if err != nil { return } @@ -225,7 +201,7 @@ func makeControllerKey(id string) string { func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) - _, err := registry.helper().ExtractObj(key, &controller, false) + err := registry.helper().ExtractObj(key, &controller, false) if err != nil { return nil, err } @@ -264,7 +240,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error { func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service - _, err := registry.helper().ExtractObj(key, &svc, false) + err := registry.helper().ExtractObj(key, &svc, false) if err != nil { return nil, err } diff --git a/pkg/registry/etcd_registry_test.go b/pkg/registry/etcd_registry_test.go index e48475334b9..2fe21a79b59 100644 --- a/pkg/registry/etcd_registry_test.go +++ b/pkg/registry/etcd_registry_test.go @@ -267,8 +267,7 @@ func TestEtcdDeletePod(t *testing.T) { expectNoError(t, err) if len(fakeClient.DeletedKeys) != 1 { t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) - } - if fakeClient.DeletedKeys[0] != key { + } else if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) diff --git a/pkg/util/etcd_tools.go b/pkg/util/etcd_tools.go index aeb2d45bc06..912147ab55d 100644 --- a/pkg/util/etcd_tools.go +++ b/pkg/util/etcd_tools.go @@ -116,27 +116,10 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { // Unmarshals json found at key into objPtr. On a not found error, will either return // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. -func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (modifiedIndex uint64, err error) { - _, modifiedIndex, err = h.bodyAndExtractObj(key, objPtr, ignoreNotFound) - return modifiedIndex, err -} - -// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches -// the previous modified index. -func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error { - data, err := json.Marshal(obj) - if err != nil { - return err - } - _, err = h.Client.CompareAndSwap(key, string(data), 0, "", index) - return err -} - -/* func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) return err -}*/ +} func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { response, err := h.Client.Get(key, false, false) @@ -202,11 +185,19 @@ func (h *EtcdHelper) AtomicUpdate(key string, objPtr interface{}, tryUpdate Etcd return err } + // First time this key has been used, just set. + if index == 0 { + //return h.SetObj(key, ret) + } + data, err := json.Marshal(ret) if err != nil { return err } _, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index) + if IsEtcdConflict(err) { + continue + } return err } } diff --git a/pkg/util/etcd_tools_test.go b/pkg/util/etcd_tools_test.go index 4355dcd66b9..30c4c6243b9 100644 --- a/pkg/util/etcd_tools_test.go +++ b/pkg/util/etcd_tools_test.go @@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) { fakeClient.Set("/some/key", MakeJSONString(expect), 0) helper := EtcdHelper{fakeClient} var got testMarshalType - _, err := helper.ExtractObj("/some/key", &got, false) + err := helper.ExtractObj("/some/key", &got, false) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) { helper := EtcdHelper{fakeClient} try := func(key string) { var got testMarshalType - _, err := helper.ExtractObj(key, &got, false) + err := helper.ExtractObj(key, &got, false) if err == nil { t.Errorf("%s: wanted error but didn't get one", key) } - _, err = helper.ExtractObj(key, &got, true) + err = helper.ExtractObj(key, &got, true) if err != nil { t.Errorf("%s: didn't want error but got %#v", key, err) } diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 918af95064c..9fddd076695 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -61,6 +61,7 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, f.t.Errorf("Unexpected get for %s", key) return &etcd.Response{}, &etcd.EtcdError{ErrorCode: 100} // Key not found } + f.t.Logf("returning %v: %v %#v", key, result.R, result.E) return result.R, result.E } @@ -85,6 +86,13 @@ func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, return f.Set(key, value, ttl) } func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { + f.Data[key] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ErrorCode: 100}, + } + f.DeletedKeys = append(f.DeletedKeys, key) return &etcd.Response{}, f.Err }