From 6464774a9b94f6e8376e11d015fd55e98457e74c Mon Sep 17 00:00:00 2001 From: Mikhail Mazurskiy Date: Wed, 7 Jun 2017 09:44:16 +1000 Subject: [PATCH] Refactor Start functions into an object --- .../k8s.io/apimachinery/pkg/util/wait/wait.go | 32 +++++++++++-------- .../client-go/tools/cache/controller.go | 4 +-- .../client-go/tools/cache/shared_informer.go | 16 +++++----- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go index 0a07bb46730..0997de8065d 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -20,6 +20,7 @@ import ( "context" "errors" "math/rand" + "sync" "time" "k8s.io/apimachinery/pkg/util/runtime" @@ -37,33 +38,36 @@ var ForeverTestTimeout = time.Second * 30 // NeverStop may be passed to Until to make it never stop. var NeverStop <-chan struct{} = make(chan struct{}) -// Group is an interface to decouple code from sync.WaitGroup. -type Group interface { - Add(delta int) - Done() +// Group allows to start a group of goroutines and wait for their completion. +type Group struct { + wg sync.WaitGroup } -// StartWithChannelWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished. +func (g *Group) Wait() { + g.wg.Wait() +} + +// StartWithChannel starts f in a new goroutine in the group. // stopCh is passed to f as an argument. f should stop when stopCh is available. -func StartWithChannelWithinGroup(stopCh <-chan struct{}, g Group, f func(stopCh <-chan struct{})) { - StartWithinGroup(g, func() { +func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) { + g.Start(func() { f(stopCh) }) } -// StartWithContextWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished. +// StartWithContext starts f in a new goroutine in the group. // ctx is passed to f as an argument. f should stop when ctx.Done() is available. -func StartWithContextWithinGroup(ctx context.Context, g Group, f func(context.Context)) { - StartWithinGroup(g, func() { +func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) { + g.Start(func() { f(ctx) }) } -// StartWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished. -func StartWithinGroup(g Group, f func()) { - g.Add(1) +// Start starts f in a new goroutine in the group. +func (g *Group) Start(f func()) { + g.wg.Add(1) go func() { - defer g.Done() + defer g.wg.Done() f() }() } diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index b8bbf5231ee..e7b98befadf 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -116,10 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) { c.reflector = r c.reflectorMutex.Unlock() - var wg sync.WaitGroup + var wg wait.Group defer wg.Wait() - wait.StartWithChannelWithinGroup(stopCh, &wg, r.Run) + wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index e985167814a..62009646efe 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -147,7 +147,7 @@ type sharedIndexInformer struct { // stopCh is the channel used to stop the main Run process. We have to track it so that // late joiners can have a proper stop stopCh <-chan struct{} - wg sync.WaitGroup + wg wait.Group } // dummyController hides the fact that a SharedInformer is different from a dedicated one @@ -211,8 +211,8 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer s.wg.Wait() - wait.StartWithChannelWithinGroup(stopCh, &s.wg, s.cacheMutationDetector.Run) - wait.StartWithChannelWithinGroup(stopCh, &s.wg, s.processor.run) + s.wg.StartWithChannel(stopCh, s.cacheMutationDetector.Run) + s.wg.StartWithChannel(stopCh, s.processor.run) s.controller.Run(stopCh) } @@ -327,8 +327,8 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv s.processor.addListener(listener) - wait.StartWithChannelWithinGroup(s.stopCh, &s.wg, listener.run) - wait.StartWithChannelWithinGroup(s.stopCh, &s.wg, listener.pop) + s.wg.StartWithChannel(s.stopCh, listener.run) + s.wg.StartWithChannel(s.stopCh, listener.pop) items := s.indexer.List() for i := range items { @@ -398,13 +398,13 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { } func (p *sharedProcessor) run(stopCh <-chan struct{}) { - var wg sync.WaitGroup + var wg wait.Group func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { - wait.StartWithChannelWithinGroup(stopCh, &wg, listener.run) - wait.StartWithChannelWithinGroup(stopCh, &wg, listener.pop) + wg.StartWithChannel(stopCh, listener.run) + wg.StartWithChannel(stopCh, listener.pop) } }() wg.Wait()