Make UntilWithSync wait for integrated informers to stop

This commit is contained in:
Tomas Nozicka 2019-01-21 18:30:00 +01:00
parent cd18b8f8a9
commit b762d2c0d7
4 changed files with 22 additions and 14 deletions

View File

@ -58,8 +58,10 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) { // it also returns a channel you can use to wait for the informers to fully shutdown.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
ch := make(chan watch.Event) ch := make(chan watch.Event)
doneCh := make(chan struct{})
w := watch.NewProxyWatcher(ch) w := watch.NewProxyWatcher(ch)
t := newTicketer() t := newTicketer()
@ -107,8 +109,9 @@ func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (
}, cache.Indexers{}) }, cache.Indexers{})
go func() { go func() {
defer close(doneCh)
informer.Run(w.StopChan()) informer.Run(w.StopChan())
}() }()
return indexer, informer, w return indexer, informer, w, doneCh
} }

View File

@ -188,7 +188,7 @@ func TestNewInformerWatcher(t *testing.T) {
return fake.Core().Secrets("").Watch(options) return fake.Core().Secrets("").Watch(options)
}, },
} }
_, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{}) _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
var result []watch.Event var result []watch.Event
loop: loop:
@ -227,9 +227,7 @@ func TestNewInformerWatcher(t *testing.T) {
// Stop before reading all the data to make sure the informer can deal with closed channel // Stop before reading all the data to make sure the informer can deal with closed channel
w.Stop() w.Stop()
// Wait a bit to see if the informer won't panic <-done
// TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591)
time.Sleep(1 * time.Second)
}) })
} }

View File

@ -108,7 +108,10 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: // The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
// waiting for object reaching a state, "small" controllers, ... // waiting for object reaching a state, "small" controllers, ...
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType) indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType)
// We need to wait for the internal informers to fully stop so it's easier to reason about
// and it works with non-thread safe clients.
defer func() { <-done }()
// Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and
// let UntilWithoutRetry to stop it // let UntilWithoutRetry to stop it
defer watcher.Stop() defer watcher.Stop()

View File

@ -62,6 +62,8 @@ func normalizeInformerOutputFunc(initialVal string) func(output []string) []stri
} }
} }
func noop() {}
func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
// Has to be longer than 5 seconds // Has to be longer than 5 seconds
timeout := 2 * time.Minute timeout := 2 * time.Minute
@ -154,18 +156,19 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
name string name string
succeed bool succeed bool
secret *v1.Secret secret *v1.Secret
getWatcher func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) getWatcher func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error, func())
normalizeOutputFunc func(referenceOutput []string) []string normalizeOutputFunc func(referenceOutput []string) []string
}{ }{
{ {
name: "regular watcher should fail", name: "regular watcher should fail",
succeed: false, succeed: false,
secret: newTestSecret("secret-01"), secret: newTestSecret("secret-01"),
getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) { getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error, func()) {
options := metav1.ListOptions{ options := metav1.ListOptions{
ResourceVersion: secret.ResourceVersion, ResourceVersion: secret.ResourceVersion,
} }
return getWatchFunc(c, secret)(options) w, err := getWatchFunc(c, secret)(options)
return w, err, noop
}, // regular watcher; unfortunately destined to fail }, // regular watcher; unfortunately destined to fail
normalizeOutputFunc: noopNormalization, normalizeOutputFunc: noopNormalization,
}, },
@ -173,7 +176,7 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
name: "InformerWatcher survives closed watches", name: "InformerWatcher survives closed watches",
succeed: true, succeed: true,
secret: newTestSecret("secret-03"), secret: newTestSecret("secret-03"),
getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) { getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error, func()) {
lw := &cache.ListWatch{ lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return getListFunc(c, secret)(options), nil return getListFunc(c, secret)(options), nil
@ -182,8 +185,8 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
return getWatchFunc(c, secret)(options) return getWatchFunc(c, secret)(options)
}, },
} }
_, _, w := watchtools.NewIndexerInformerWatcher(lw, &v1.Secret{}) _, _, w, done := watchtools.NewIndexerInformerWatcher(lw, &v1.Secret{})
return w, nil return w, nil, func() { <-done }
}, },
normalizeOutputFunc: normalizeInformerOutputFunc(initialCount), normalizeOutputFunc: normalizeInformerOutputFunc(initialCount),
}, },
@ -202,10 +205,11 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err) t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err)
} }
watcher, err := tc.getWatcher(c, secret) watcher, err, doneFn := tc.getWatcher(c, secret)
if err != nil { if err != nil {
t.Fatalf("Failed to create watcher: %v", err) t.Fatalf("Failed to create watcher: %v", err)
} }
defer doneFn()
var referenceOutput []string var referenceOutput []string
var output []string var output []string