Refactor Start functions into an object

This commit is contained in:
Mikhail Mazurskiy 2017-06-07 09:44:16 +10:00
parent d11a9973cf
commit 6464774a9b
No known key found for this signature in database
GPG Key ID: 93551ECC96E2F568
3 changed files with 28 additions and 24 deletions

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"errors" "errors"
"math/rand" "math/rand"
"sync"
"time" "time"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
@ -37,33 +38,36 @@ var ForeverTestTimeout = time.Second * 30
// NeverStop may be passed to Until to make it never stop. // NeverStop may be passed to Until to make it never stop.
var NeverStop <-chan struct{} = make(chan struct{}) var NeverStop <-chan struct{} = make(chan struct{})
// Group is an interface to decouple code from sync.WaitGroup. // Group allows to start a group of goroutines and wait for their completion.
type Group interface { type Group struct {
Add(delta int) wg sync.WaitGroup
Done()
} }
// StartWithChannelWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished. func (g *Group) Wait() {
g.wg.Wait()
}
// StartWithChannel starts f in a new goroutine in the group.
// stopCh is passed to f as an argument. f should stop when stopCh is available. // stopCh is passed to f as an argument. f should stop when stopCh is available.
func StartWithChannelWithinGroup(stopCh <-chan struct{}, g Group, f func(stopCh <-chan struct{})) { func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
StartWithinGroup(g, func() { g.Start(func() {
f(stopCh) f(stopCh)
}) })
} }
// StartWithContextWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished. // StartWithContext starts f in a new goroutine in the group.
// ctx is passed to f as an argument. f should stop when ctx.Done() is available. // ctx is passed to f as an argument. f should stop when ctx.Done() is available.
func StartWithContextWithinGroup(ctx context.Context, g Group, f func(context.Context)) { func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
StartWithinGroup(g, func() { g.Start(func() {
f(ctx) f(ctx)
}) })
} }
// StartWithinGroup adds 1 to the group, starts f in a new goroutine and calls g.Done once f has finished. // Start starts f in a new goroutine in the group.
func StartWithinGroup(g Group, f func()) { func (g *Group) Start(f func()) {
g.Add(1) g.wg.Add(1)
go func() { go func() {
defer g.Done() defer g.wg.Done()
f() f()
}() }()
} }

View File

@ -116,10 +116,10 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.reflector = r c.reflector = r
c.reflectorMutex.Unlock() c.reflectorMutex.Unlock()
var wg sync.WaitGroup var wg wait.Group
defer wg.Wait() defer wg.Wait()
wait.StartWithChannelWithinGroup(stopCh, &wg, r.Run) wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh) wait.Until(c.processLoop, time.Second, stopCh)
} }

View File

@ -147,7 +147,7 @@ type sharedIndexInformer struct {
// stopCh is the channel used to stop the main Run process. We have to track it so that // stopCh is the channel used to stop the main Run process. We have to track it so that
// late joiners can have a proper stop // late joiners can have a proper stop
stopCh <-chan struct{} stopCh <-chan struct{}
wg sync.WaitGroup wg wait.Group
} }
// dummyController hides the fact that a SharedInformer is different from a dedicated one // dummyController hides the fact that a SharedInformer is different from a dedicated one
@ -211,8 +211,8 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer s.wg.Wait() defer s.wg.Wait()
wait.StartWithChannelWithinGroup(stopCh, &s.wg, s.cacheMutationDetector.Run) s.wg.StartWithChannel(stopCh, s.cacheMutationDetector.Run)
wait.StartWithChannelWithinGroup(stopCh, &s.wg, s.processor.run) s.wg.StartWithChannel(stopCh, s.processor.run)
s.controller.Run(stopCh) s.controller.Run(stopCh)
} }
@ -327,8 +327,8 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
s.processor.addListener(listener) s.processor.addListener(listener)
wait.StartWithChannelWithinGroup(s.stopCh, &s.wg, listener.run) s.wg.StartWithChannel(s.stopCh, listener.run)
wait.StartWithChannelWithinGroup(s.stopCh, &s.wg, listener.pop) s.wg.StartWithChannel(s.stopCh, listener.pop)
items := s.indexer.List() items := s.indexer.List()
for i := range items { for i := range items {
@ -398,13 +398,13 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
} }
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func (p *sharedProcessor) run(stopCh <-chan struct{}) {
var wg sync.WaitGroup var wg wait.Group
func() { func() {
p.listenersLock.RLock() p.listenersLock.RLock()
defer p.listenersLock.RUnlock() defer p.listenersLock.RUnlock()
for _, listener := range p.listeners { for _, listener := range p.listeners {
wait.StartWithChannelWithinGroup(stopCh, &wg, listener.run) wg.StartWithChannel(stopCh, listener.run)
wait.StartWithChannelWithinGroup(stopCh, &wg, listener.pop) wg.StartWithChannel(stopCh, listener.pop)
} }
}() }()
wg.Wait() wg.Wait()