diff --git a/tools/watch/retrywatcher.go b/tools/watch/retrywatcher.go index d81dc435..8431d02f 100644 --- a/tools/watch/retrywatcher.go +++ b/tools/watch/retrywatcher.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net/http" + "sync" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -53,6 +54,7 @@ type RetryWatcher struct { stopChan chan struct{} doneChan chan struct{} minRestartDelay time.Duration + stopChanLock sync.Mutex } // NewRetryWatcher creates a new RetryWatcher. @@ -286,7 +288,15 @@ func (rw *RetryWatcher) ResultChan() <-chan watch.Event { // Stop implements Interface. func (rw *RetryWatcher) Stop() { - close(rw.stopChan) + rw.stopChanLock.Lock() + defer rw.stopChanLock.Unlock() + + // Prevent closing an already closed channel to prevent a panic + select { + case <-rw.stopChan: + default: + close(rw.stopChan) + } } // Done allows the caller to be notified when Retry watcher stops. diff --git a/tools/watch/retrywatcher_test.go b/tools/watch/retrywatcher_test.go index fff3a46c..297661aa 100644 --- a/tools/watch/retrywatcher_test.go +++ b/tools/watch/retrywatcher_test.go @@ -585,6 +585,8 @@ func TestRetryWatcherToFinishWithUnreadEvents(t *testing.T) { // Give the watcher a chance to get to sending events (blocking) time.Sleep(10 * time.Millisecond) + watcher.Stop() + // Verify a second stop does not cause a panic watcher.Stop() maxTime := time.Second