diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 66b214720f3..19dc55170a7 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -41,6 +41,12 @@ func main() { util.InitLogs() defer util.FlushLogs() + go func() { + defer util.FlushLogs() + time.Sleep(3 * time.Minute) + glog.Fatalf("This test has timed out.") + }() + manifestUrl := ServeCachedManifestFile() // Setup servers := []string{"http://localhost:4001"} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 047ccd09d4b..79bca4b8233 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -436,10 +436,17 @@ func TestSyncManifestsDeletes(t *testing.T) { err := kubelet.SyncManifests([]api.ContainerManifest{}) expectNoError(t, err) verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"}) + + // A map interation is used to delete containers, so must not depend on + // order here. + expectedToStop := map[string]bool{ + "1234": true, + "9876": true, + } if len(fakeDocker.stopped) != 2 || - fakeDocker.stopped[0] != "1234" || - fakeDocker.stopped[1] != "9876" { - t.Errorf("Unexpected sequence of stopped containers: %s", fakeDocker.stopped) + !expectedToStop[fakeDocker.stopped[0]] || + !expectedToStop[fakeDocker.stopped[1]] { + t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped) } } diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index b2a7a68c898..162d62c61ba 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -17,7 +17,6 @@ limitations under the License. package registry import ( - "encoding/json" "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -90,56 +89,37 @@ func makeContainerKey(machine string) string { return "/registry/hosts/" + machine + "/kubelet" } -func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) { - err, index = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true) - return manifests, index, err -} - -func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest, index uint64) error { - if index != 0 { - return registry.helper().CompareAndSwapObj(makeContainerKey(machine), manifests, index) - } else { - return registry.helper().SetObj(makeContainerKey(machine), manifests) - } -} - func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { podOut, machine, err := registry.findPod(pod.ID) if err == nil { + // TODO: this error message looks racy. return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut) } return registry.runPod(pod, machineIn) } func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { - manifests, index, err := registry.loadManifests(machine) - if err != nil { - return err - } - - key := makePodKey(machine, pod.ID) - data, err := json.Marshal(pod) - if err != nil { - return err - } - _, err = registry.etcdClient.Create(key, string(data), 0) + podKey := makePodKey(machine, pod.ID) + err := registry.helper().SetObj(podKey, pod) manifest, err := registry.manifestFactory.MakeManifest(machine, pod) if err != nil { return err } - for { - manifests = append(manifests, manifest) - err = registry.updateManifests(machine, manifests, index) - if util.IsEtcdConflict(err) { - manifests, index, err = registry.loadManifests(machine) - if err != nil { - return err - } - continue + + contKey := makeContainerKey(machine) + err = registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) { + manifests := *in.(*[]api.ContainerManifest) + return append(manifests, manifest), nil + }) + if err != nil { + // Don't strand stuff. + _, err2 := registry.etcdClient.Delete(podKey, false) + if err2 != nil { + glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) } - return err } + return err } func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error { @@ -155,12 +135,19 @@ func (registry *EtcdRegistry) DeletePod(podID string) error { } func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error { - for { - manifests, index, err := registry.loadManifests(machine) - if err != nil { - return err - } - newManifests := make([]api.ContainerManifest, 0) + // First delete the pod, so a scheduler doesn't notice it getting removed from the + // machine and attempt to put it somewhere. + podKey := makePodKey(machine, podID) + _, err := registry.etcdClient.Delete(podKey, true) + if err != nil { + return err + } + + // Next, remove the pod from the machine atomically. + contKey := makeContainerKey(machine) + return registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) { + manifests := *in.(*[]api.ContainerManifest) + newManifests := make([]api.ContainerManifest, 0, len(manifests)) found := false for _, manifest := range manifests { if manifest.Id != podID { @@ -175,22 +162,13 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error // However it is "deleted" so log it and move on glog.Infof("Couldn't find: %s in %#v", podID, manifests) } - if err = registry.updateManifests(machine, newManifests, index); err != nil { - if util.IsEtcdConflict(err) { - continue - } - return err - } - break - } - key := makePodKey(machine, podID) - _, err := registry.etcdClient.Delete(key, true) - return err + return newManifests, nil + }) } func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { key := makePodKey(machine, podID) - err, _ = registry.helper().ExtractObj(key, &pod, false) + err = registry.helper().ExtractObj(key, &pod, false) if err != nil { return } @@ -225,7 +203,7 @@ func makeControllerKey(id string) string { func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { var controller api.ReplicationController key := makeControllerKey(controllerID) - err, _ := registry.helper().ExtractObj(key, &controller, false) + err := registry.helper().ExtractObj(key, &controller, false) if err != nil { return nil, err } @@ -264,7 +242,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error { func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { key := makeServiceKey(name) var svc api.Service - err, _ := registry.helper().ExtractObj(key, &svc, false) + err := registry.helper().ExtractObj(key, &svc, false) if err != nil { return nil, err } diff --git a/pkg/registry/etcd_registry_test.go b/pkg/registry/etcd_registry_test.go index e48475334b9..2fe21a79b59 100644 --- a/pkg/registry/etcd_registry_test.go +++ b/pkg/registry/etcd_registry_test.go @@ -267,8 +267,7 @@ func TestEtcdDeletePod(t *testing.T) { expectNoError(t, err) if len(fakeClient.DeletedKeys) != 1 { t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys) - } - if fakeClient.DeletedKeys[0] != key { + } else if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) diff --git a/pkg/util/etcd_tools.go b/pkg/util/etcd_tools.go index 26b2641939c..39f4b37a3f5 100644 --- a/pkg/util/etcd_tools.go +++ b/pkg/util/etcd_tools.go @@ -116,33 +116,29 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { // Unmarshals json found at key into objPtr. On a not found error, will either return // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. -func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (error, uint64) { +func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { + _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) + return err +} + +func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { response, err := h.Client.Get(key, false, false) if err != nil && !IsEtcdNotFound(err) { - return err, 0 + return "", 0, err } if err != nil || response.Node == nil || len(response.Node.Value) == 0 { if ignoreNotFound { pv := reflect.ValueOf(objPtr) pv.Elem().Set(reflect.Zero(pv.Type().Elem())) - return nil, 0 + return "", 0, nil } else if err != nil { - return err, 0 + return "", 0, err } - return fmt.Errorf("key '%v' found no nodes field: %#v", key, response), 0 + return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response) } - return json.Unmarshal([]byte(response.Node.Value), objPtr), response.Node.ModifiedIndex -} - -// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches the previous modified index -func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error { - data, err := json.Marshal(obj) - if err != nil { - return err - } - _, err = h.Client.CompareAndSwap(key, string(data), 0, "", index) - return err + body = response.Node.Value + return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr) } // SetObj marshals obj via json, and stores under key. @@ -154,3 +150,63 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error { _, err = h.Client.Set(key, string(data), 0) return err } + +// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update. +// See the comment for AtomicUpdate for more detail. +type EtcdUpdateFunc func(input interface{}) (output interface{}, err error) + +// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects. +// Note, tryUpdate may be called more than once. +// +// Example: +// +// h := &util.EtcdHelper{client} +// err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) { +// // Before this function is called, currentObj has been reset to etcd's current +// // contents for "myKey". +// +// cur := input.(*MyType) // Gauranteed to work. +// +// // Make a *modification*. +// cur.Counter++ +// +// // Return the modified object. Return an error to stop iterating. +// return cur, nil +// }) +// +func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate EtcdUpdateFunc) error { + pt := reflect.TypeOf(ptrToType) + if pt.Kind() != reflect.Ptr { + // Panic is appropriate, because this is a programming error. + panic("need ptr to type") + } + for { + obj := reflect.New(pt.Elem()).Interface() + origBody, index, err := h.bodyAndExtractObj(key, obj, true) + if err != nil { + return err + } + + ret, err := tryUpdate(obj) + if err != nil { + return err + } + + // First time this key has been used, just set. + // TODO: This is racy. Fix when our client supports prevExist. See: + // https://github.com/coreos/etcd/blob/master/Documentation/api.md#atomic-compare-and-swap + if index == 0 { + return h.SetObj(key, ret) + } + + data, err := json.Marshal(ret) + if err != nil { + return err + } + _, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index) + if IsEtcdConflict(err) { + continue + } + return err + } +} diff --git a/pkg/util/etcd_tools_test.go b/pkg/util/etcd_tools_test.go index a322e504216..30c4c6243b9 100644 --- a/pkg/util/etcd_tools_test.go +++ b/pkg/util/etcd_tools_test.go @@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) { fakeClient.Set("/some/key", MakeJSONString(expect), 0) helper := EtcdHelper{fakeClient} var got testMarshalType - err, _ := helper.ExtractObj("/some/key", &got, false) + err := helper.ExtractObj("/some/key", &got, false) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) { helper := EtcdHelper{fakeClient} try := func(key string) { var got testMarshalType - err, _ := helper.ExtractObj(key, &got, false) + err := helper.ExtractObj(key, &got, false) if err == nil { t.Errorf("%s: wanted error but didn't get one", key) } - err, _ = helper.ExtractObj(key, &got, true) + err = helper.ExtractObj(key, &got, true) if err != nil { t.Errorf("%s: didn't want error but got %#v", key, err) } diff --git a/pkg/util/fake_etcd_client.go b/pkg/util/fake_etcd_client.go index 918af95064c..9fddd076695 100644 --- a/pkg/util/fake_etcd_client.go +++ b/pkg/util/fake_etcd_client.go @@ -61,6 +61,7 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, f.t.Errorf("Unexpected get for %s", key) return &etcd.Response{}, &etcd.EtcdError{ErrorCode: 100} // Key not found } + f.t.Logf("returning %v: %v %#v", key, result.R, result.E) return result.R, result.E } @@ -85,6 +86,13 @@ func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, return f.Set(key, value, ttl) } func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { + f.Data[key] = EtcdResponseWithError{ + R: &etcd.Response{ + Node: nil, + }, + E: &etcd.EtcdError{ErrorCode: 100}, + } + f.DeletedKeys = append(f.DeletedKeys, key) return &etcd.Response{}, f.Err }