diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 62276ca412e..93c1f7a9ae5 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -100,7 +100,8 @@ type etcdWatcher struct { userStop chan struct{} stopped bool stopLock sync.Mutex - // wg is used to avoid calls to etcd after Stop() + // wg is used to avoid calls to etcd after Stop(), and to make sure + // that the translate goroutine is not leaked. wg sync.WaitGroup // Injectable for testing. Send the event down the outgoing channel. @@ -146,7 +147,17 @@ func newEtcdWatcher( ctx: nil, cancel: nil, } - w.emit = func(e watch.Event) { w.outgoing <- e } + w.emit = func(e watch.Event) { + // Give up on user stop, without this we leak a lot of goroutines in tests. + select { + case w.outgoing <- e: + case <-w.userStop: + } + } + // translate will call done. We need to Add() here because otherwise, + // if Stop() gets called before translate gets started, there'd be a + // problem. + w.wg.Add(1) go w.translate() return w } @@ -256,6 +267,7 @@ var ( // translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be // called as a goroutine. func (w *etcdWatcher) translate() { + defer w.wg.Done() defer close(w.outgoing) defer utilruntime.HandleCrash()