mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-23 05:37:13 +00:00
Merge pull request #73080 from tnozicka/make-until-with-sync-wait-for-informers
Make UntilWithSync wait for integrated informers to stop Kubernetes-commit: f788854e98105cd054b8c86feba30c69de6194bc
This commit is contained in:
commit
e3fddcc5ac
@ -58,8 +58,10 @@ func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
|
||||
|
||||
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
|
||||
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
|
||||
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) {
|
||||
// it also returns a channel you can use to wait for the informers to fully shutdown.
|
||||
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
|
||||
ch := make(chan watch.Event)
|
||||
doneCh := make(chan struct{})
|
||||
w := watch.NewProxyWatcher(ch)
|
||||
t := newTicketer()
|
||||
|
||||
@ -107,8 +109,9 @@ func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (
|
||||
}, cache.Indexers{})
|
||||
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
informer.Run(w.StopChan())
|
||||
}()
|
||||
|
||||
return indexer, informer, w
|
||||
return indexer, informer, w, doneCh
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ func TestNewInformerWatcher(t *testing.T) {
|
||||
return fake.Core().Secrets("").Watch(options)
|
||||
},
|
||||
}
|
||||
_, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{})
|
||||
_, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
|
||||
|
||||
var result []watch.Event
|
||||
loop:
|
||||
@ -227,9 +227,7 @@ func TestNewInformerWatcher(t *testing.T) {
|
||||
// Stop before reading all the data to make sure the informer can deal with closed channel
|
||||
w.Stop()
|
||||
|
||||
// Wait a bit to see if the informer won't panic
|
||||
// TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591)
|
||||
time.Sleep(1 * time.Second)
|
||||
<-done
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,10 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions
|
||||
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
|
||||
// waiting for object reaching a state, "small" controllers, ...
|
||||
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
|
||||
indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType)
|
||||
indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType)
|
||||
// We need to wait for the internal informers to fully stop so it's easier to reason about
|
||||
// and it works with non-thread safe clients.
|
||||
defer func() { <-done }()
|
||||
// Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and
|
||||
// let UntilWithoutRetry to stop it
|
||||
defer watcher.Stop()
|
||||
|
Loading…
Reference in New Issue
Block a user