diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go index 35a34694939..4ccc4b49a9c 100644 --- a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go @@ -58,8 +58,10 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) { // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. -func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) { +// it also returns a channel you can use to wait for the informers to fully shutdown. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) { ch := make(chan watch.Event) + doneCh := make(chan struct{}) w := watch.NewProxyWatcher(ch) t := newTicketer() @@ -107,8 +109,9 @@ func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) ( }, cache.Indexers{}) go func() { + defer close(doneCh) informer.Run(w.StopChan()) }() - return indexer, informer, w + return indexer, informer, w, doneCh } diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go index e94b4d25638..b5a09f0c321 100644 --- a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go @@ -188,7 +188,7 @@ func TestNewInformerWatcher(t *testing.T) { return fake.Core().Secrets("").Watch(options) }, } - _, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{}) + _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{}) var result []watch.Event loop: @@ -227,9 +227,7 @@ func TestNewInformerWatcher(t *testing.T) { // Stop before reading all the data to make sure the informer can deal with closed channel w.Stop() - // Wait a bit to see if the informer won't panic - // TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591) - time.Sleep(1 * time.Second) + <-done }) } diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index aa4bbc21169..541343711d1 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -108,7 +108,10 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions // The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: // waiting for object reaching a state, "small" controllers, ... func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { - indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType) + indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType) + // We need to wait for the internal informers to fully stop so it's easier to reason about + // and it works with non-thread safe clients. + defer func() { <-done }() // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and // let UntilWithoutRetry to stop it defer watcher.Stop() diff --git a/test/integration/apimachinery/watch_restart_test.go b/test/integration/apimachinery/watch_restart_test.go index 547ab8dd402..1bd288788a9 100644 --- a/test/integration/apimachinery/watch_restart_test.go +++ b/test/integration/apimachinery/watch_restart_test.go @@ -62,6 +62,8 @@ func normalizeInformerOutputFunc(initialVal string) func(output []string) []stri } } +func noop() {} + func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { // Has to be longer than 5 seconds timeout := 2 * time.Minute @@ -154,18 +156,19 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { name string succeed bool secret *v1.Secret - getWatcher func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) + getWatcher func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error, func()) normalizeOutputFunc func(referenceOutput []string) []string }{ { name: "regular watcher should fail", succeed: false, secret: newTestSecret("secret-01"), - getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) { + getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error, func()) { options := metav1.ListOptions{ ResourceVersion: secret.ResourceVersion, } - return getWatchFunc(c, secret)(options) + w, err := getWatchFunc(c, secret)(options) + return w, err, noop }, // regular watcher; unfortunately destined to fail normalizeOutputFunc: noopNormalization, }, @@ -173,7 +176,7 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { name: "InformerWatcher survives closed watches", succeed: true, secret: newTestSecret("secret-03"), - getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) { + getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error, func()) { lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return getListFunc(c, secret)(options), nil @@ -182,8 +185,8 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { return getWatchFunc(c, secret)(options) }, } - _, _, w := watchtools.NewIndexerInformerWatcher(lw, &v1.Secret{}) - return w, nil + _, _, w, done := watchtools.NewIndexerInformerWatcher(lw, &v1.Secret{}) + return w, nil, func() { <-done } }, normalizeOutputFunc: normalizeInformerOutputFunc(initialCount), }, @@ -202,10 +205,11 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err) } - watcher, err := tc.getWatcher(c, secret) + watcher, err, doneFn := tc.getWatcher(c, secret) if err != nil { t.Fatalf("Failed to create watcher: %v", err) } + defer doneFn() var referenceOutput []string var output []string