address review comments

asd
This commit is contained in:
Alexander Zielenski 2022-08-08 11:44:37 -07:00
parent 063ef090e7
commit 5f372e1867
No known key found for this signature in database
GPG Key ID: 754BC11B447F7843
2 changed files with 29 additions and 25 deletions

View File

@ -663,8 +663,8 @@ type sharedProcessor struct {
} }
func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener { func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
p.listenersLock.Lock() p.listenersLock.RLock()
defer p.listenersLock.Unlock() defer p.listenersLock.RUnlock()
if p.listeners == nil { if p.listeners == nil {
return nil return nil
@ -744,16 +744,19 @@ func (p *sharedProcessor) run(stopCh <-chan struct{}) {
}() }()
<-stopCh <-stopCh
p.listenersLock.Lock() func() {
defer p.listenersLock.Unlock() p.listenersLock.Lock()
for listener := range p.listeners { defer p.listenersLock.Unlock()
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop for listener := range p.listeners {
} close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
p.wg.Wait() // Wait for all .pop() and .run() to stop }
// Wipe out list of listeners since they are now closed // Wipe out list of listeners since they are now closed
// (processorListener cannot be re-used) // (processorListener cannot be re-used)
p.listeners = nil p.listeners = nil
}()
p.wg.Wait() // Wait for all .pop() and .run() to stop
} }
// shouldResync queries every listener to determine if any of them need a resync, based on each // shouldResync queries every listener to determine if any of them need a resync, based on each

View File

@ -208,6 +208,7 @@ func TestResyncCheckPeriod(t *testing.T) {
// create the shared informer and resync every 12 hours // create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer) informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer)
gl := informer.processor.getListener
clock := testingclock.NewFakeClock(time.Now()) clock := testingclock.NewFakeClock(time.Now())
informer.clock = clock informer.clock = clock
@ -215,60 +216,60 @@ func TestResyncCheckPeriod(t *testing.T) {
// listener 1, never resync // listener 1, never resync
listener1 := newTestListener("listener1", 0) listener1 := newTestListener("listener1", 0)
listener1Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) handler1, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a { if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
// listener 2, resync every minute // listener 2, resync every minute
listener2 := newTestListener("listener2", 1*time.Minute) listener2 := newTestListener("listener2", 1*time.Minute)
listener2Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) handler2, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a { if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
// listener 3, resync every 55 seconds // listener 3, resync every 55 seconds
listener3 := newTestListener("listener3", 55*time.Second) listener3 := newTestListener("listener3", 55*time.Second)
listener3Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) handler3, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a { if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a { if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
// listener 4, resync every 5 seconds // listener 4, resync every 5 seconds
listener4 := newTestListener("listener4", 5*time.Second) listener4 := newTestListener("listener4", 5*time.Second)
listener4Registration, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) handler4, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod)
if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a { if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := time.Duration(0), informer.processor.getListener(listener1Registration).resyncPeriod; e != a { if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 1*time.Minute, informer.processor.getListener(listener2Registration).resyncPeriod; e != a { if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 55*time.Second, informer.processor.getListener(listener3Registration).resyncPeriod; e != a { if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
if e, a := 5*time.Second, informer.processor.getListener(listener4Registration).resyncPeriod; e != a { if e, a := 5*time.Second, gl(handler4).resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a) t.Errorf("expected %d, got %d", e, a)
} }
} }