Fix comments. Add timeout to integration test; don't make travis run for 15 minutes any more.

This commit is contained in:
Daniel Smith 2014-06-27 15:02:06 -07:00
parent 999ea50c2a
commit 9a0f89170e
3 changed files with 33 additions and 16 deletions

View File

@ -41,6 +41,12 @@ func main() {
util.InitLogs()
defer util.FlushLogs()
go func() {
defer util.FlushLogs()
time.Sleep(3 * time.Minute)
glog.Fatalf("This test has timed out.")
}()
manifestUrl := ServeCachedManifestFile()
// Setup
servers := []string{"http://localhost:4001"}

View File

@ -108,16 +108,18 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
}
contKey := makeContainerKey(machine)
var manifests []api.ContainerManifest
err = registry.helper().AtomicUpdate(contKey, &manifests, func() (interface{}, error) {
err = registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*[]api.ContainerManifest)
return append(manifests, manifest), nil
})
if err != nil {
// Don't strand stuff.
registry.etcdClient.Delete(podKey, false)
return err
_, err2 := registry.etcdClient.Delete(podKey, false)
if err2 != nil {
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
}
}
return nil
return err
}
func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
@ -143,8 +145,8 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
var manifests []api.ContainerManifest
return registry.helper().AtomicUpdate(contKey, &manifests, func() (interface{}, error) {
return registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*[]api.ContainerManifest)
newManifests := make([]api.ContainerManifest, 0, len(manifests))
found := false
for _, manifest := range manifests {

View File

@ -153,7 +153,7 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update.
// See the comment for AtomicUpdate for more detail.
type EtcdUpdateFunc func() (interface{}, error)
type EtcdUpdateFunc func(input interface{}) (output interface{}, err error)
// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects.
// Note, tryUpdate may be called more than once.
@ -161,33 +161,42 @@ type EtcdUpdateFunc func() (interface{}, error)
// Example:
//
// h := &util.EtcdHelper{client}
// var currentObj MyType
// err := h.AtomicUpdate("myKey", &currentObj, func() (interface{}, error) {
// err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) {
// // Before this function is called, currentObj has been reset to etcd's current
// // contents for "myKey".
//
// cur := input.(*MyType) // Gauranteed to work.
//
// // Make a *modification*.
// currentObj.Counter++
// cur.Counter++
//
// // Return the modified object. Return an error to stop iterating.
// return currentObj, nil
// return cur, nil
// })
//
func (h *EtcdHelper) AtomicUpdate(key string, objPtr interface{}, tryUpdate EtcdUpdateFunc) error {
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate EtcdUpdateFunc) error {
pt := reflect.TypeOf(ptrToType)
if pt.Kind() != reflect.Ptr {
// Panic is appropriate, because this is a programming error.
panic("need ptr to type")
}
for {
origBody, index, err := h.bodyAndExtractObj(key, objPtr, true)
obj := reflect.New(pt.Elem()).Interface()
origBody, index, err := h.bodyAndExtractObj(key, obj, true)
if err != nil {
return err
}
ret, err := tryUpdate()
ret, err := tryUpdate(obj)
if err != nil {
return err
}
// First time this key has been used, just set.
// TODO: This is racy. Fix when our client supports prevExist. See:
// https://github.com/coreos/etcd/blob/master/Documentation/api.md#atomic-compare-and-swap
if index == 0 {
//return h.SetObj(key, ret)
return h.SetObj(key, ret)
}
data, err := json.Marshal(ret)