diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 66b214720f3..19dc55170a7 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -41,6 +41,12 @@ func main() { util.InitLogs() defer util.FlushLogs() + go func() { + defer util.FlushLogs() + time.Sleep(3 * time.Minute) + glog.Fatalf("This test has timed out.") + }() + manifestUrl := ServeCachedManifestFile() // Setup servers := []string{"http://localhost:4001"} diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index b9cee2cc3b8..162d62c61ba 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -108,16 +108,18 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { } contKey := makeContainerKey(machine) - var manifests []api.ContainerManifest - err = registry.helper().AtomicUpdate(contKey, &manifests, func() (interface{}, error) { + err = registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) { + manifests := *in.(*[]api.ContainerManifest) return append(manifests, manifest), nil }) if err != nil { // Don't strand stuff. - registry.etcdClient.Delete(podKey, false) - return err + _, err2 := registry.etcdClient.Delete(podKey, false) + if err2 != nil { + glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) + } } - return nil + return err } func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error { @@ -143,8 +145,8 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) - var manifests []api.ContainerManifest - return registry.helper().AtomicUpdate(contKey, &manifests, func() (interface{}, error) { + return registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) { + manifests := *in.(*[]api.ContainerManifest) newManifests := make([]api.ContainerManifest, 0, len(manifests)) found := false for _, manifest := range manifests { diff --git a/pkg/util/etcd_tools.go b/pkg/util/etcd_tools.go index 912147ab55d..39f4b37a3f5 100644 --- a/pkg/util/etcd_tools.go +++ b/pkg/util/etcd_tools.go @@ -153,7 +153,7 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error { // Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update. // See the comment for AtomicUpdate for more detail. -type EtcdUpdateFunc func() (interface{}, error) +type EtcdUpdateFunc func(input interface{}) (output interface{}, err error) // AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects. // Note, tryUpdate may be called more than once. @@ -161,33 +161,42 @@ type EtcdUpdateFunc func() (interface{}, error) // Example: // // h := &util.EtcdHelper{client} -// var currentObj MyType -// err := h.AtomicUpdate("myKey", ¤tObj, func() (interface{}, error) { +// err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) { // // Before this function is called, currentObj has been reset to etcd's current // // contents for "myKey". // +// cur := input.(*MyType) // Gauranteed to work. +// // // Make a *modification*. -// currentObj.Counter++ +// cur.Counter++ // // // Return the modified object. Return an error to stop iterating. -// return currentObj, nil +// return cur, nil // }) // -func (h *EtcdHelper) AtomicUpdate(key string, objPtr interface{}, tryUpdate EtcdUpdateFunc) error { +func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate EtcdUpdateFunc) error { + pt := reflect.TypeOf(ptrToType) + if pt.Kind() != reflect.Ptr { + // Panic is appropriate, because this is a programming error. + panic("need ptr to type") + } for { - origBody, index, err := h.bodyAndExtractObj(key, objPtr, true) + obj := reflect.New(pt.Elem()).Interface() + origBody, index, err := h.bodyAndExtractObj(key, obj, true) if err != nil { return err } - ret, err := tryUpdate() + ret, err := tryUpdate(obj) if err != nil { return err } // First time this key has been used, just set. + // TODO: This is racy. Fix when our client supports prevExist. See: + // https://github.com/coreos/etcd/blob/master/Documentation/api.md#atomic-compare-and-swap if index == 0 { - //return h.SetObj(key, ret) + return h.SetObj(key, ret) } data, err := json.Marshal(ret)