diff --git a/tools/watch/informerwatcher.go b/tools/watch/informerwatcher.go index 35a34694..4ccc4b49 100644 --- a/tools/watch/informerwatcher.go +++ b/tools/watch/informerwatcher.go @@ -58,8 +58,10 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) { // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. -func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) { +// it also returns a channel you can use to wait for the informers to fully shutdown. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { ch := make(chan watch.Event) + doneCh := make(chan struct{}) w := watch.NewProxyWatcher(ch) t := newTicketer() @@ -107,8 +109,9 @@ func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) ( }, cache.Indexers{}) go func() { + defer close(doneCh) informer.Run(w.StopChan()) }() - return indexer, informer, w + return indexer, informer, w, doneCh } diff --git a/tools/watch/informerwatcher_test.go b/tools/watch/informerwatcher_test.go index e94b4d25..b5a09f0c 100644 --- a/tools/watch/informerwatcher_test.go +++ b/tools/watch/informerwatcher_test.go @@ -188,7 +188,7 @@ func TestNewInformerWatcher(t *testing.T) { return fake.Core().Secrets("").Watch(options) }, } - _, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{}) + _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{}) var result []watch.Event loop: @@ -227,9 +227,7 @@ func TestNewInformerWatcher(t *testing.T) { // Stop before reading all the data to make sure the informer can deal with closed channel w.Stop() - // Wait a bit to see if the informer won't panic - // TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591) - time.Sleep(1 * time.Second) + <-done }) } diff --git a/tools/watch/until.go b/tools/watch/until.go index aa4bbc21..54134371 100644 --- a/tools/watch/until.go +++ b/tools/watch/until.go @@ -108,7 +108,10 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions // The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: // waiting for object reaching a state, "small" controllers, ... func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { - indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType) + indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) + // We need to wait for the internal informers to fully stop so it's easier to reason about + // and it works with non-thread safe clients. + defer func() { <-done }() // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and // let UntilWithoutRetry to stop it defer watcher.Stop()