From 6297232112debed49f44078e78e0950731d9e495 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 21 Dec 2015 10:03:18 +0100 Subject: [PATCH] Fix race in EtcdWatcher --- pkg/storage/etcd/etcd_watcher.go | 49 ++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index 9487728a459..cfe14f49c15 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -144,23 +144,42 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st defer util.HandleCrash() defer close(w.etcdError) defer close(w.etcdIncoming) - if resourceVersion == 0 { - latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming) - if err != nil { - w.etcdError <- err - return - } - resourceVersion = latest - } - opts := etcd.WatcherOptions{ - Recursive: w.list, - AfterIndex: resourceVersion, + // We need to be prepared, that Stop() can be called at any time. + // It can potentially also be called, even before this function is called. + // If that is the case, we simply skip all the code here. + // See #18928 for more details. + var watcher etcd.Watcher + returned := func() bool { + w.stopLock.Lock() + defer w.stopLock.Unlock() + if w.stopped { + // Watcher has already been stopped - don't event initiate it here. + return true + } + // Perform initialization of watcher under lock - we want to avoid situation when + // Stop() is called in the meantime (which in tests can cause etcd termination and + // strange behavior here). + if resourceVersion == 0 { + latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming) + if err != nil { + w.etcdError <- err + return true + } + resourceVersion = latest + } + + opts := etcd.WatcherOptions{ + Recursive: w.list, + AfterIndex: resourceVersion, + } + watcher = client.Watcher(key, &opts) + w.ctx, w.cancel = context.WithCancel(ctx) + return false + }() + if returned { + return } - watcher := client.Watcher(key, &opts) - w.stopLock.Lock() - w.ctx, w.cancel = context.WithCancel(ctx) - w.stopLock.Unlock() for { resp, err := watcher.Next(w.ctx)