Support per-handler resync periods

Add the ability for each event handler of a shared informer to specify
its own resync period. If not specified, a handler will resync at the
informer's default interval.
This commit is contained in:
Andy Goldstein 2017-02-07 16:08:32 -05:00
parent 0fdb33b25e
commit c891998d00
6 changed files with 492 additions and 58 deletions

View File

@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/clock"
) )
// Config contains all the settings for a Controller. // Config contains all the settings for a Controller.
@ -50,6 +51,11 @@ type Config struct {
// queue. // queue.
FullResyncPeriod time.Duration FullResyncPeriod time.Duration
// ShouldResync, if specified, is invoked when the controller's reflector determines the next
// periodic sync should occur. If this returns true, it means the reflector should proceed with
// the resync.
ShouldResync ShouldResyncFunc
// If true, when Process() returns an error, re-enqueue the object. // If true, when Process() returns an error, re-enqueue the object.
// TODO: add interface to let you inject a delay/backoff or drop // TODO: add interface to let you inject a delay/backoff or drop
// the object completely if desired. Pass the object in // the object completely if desired. Pass the object in
@ -57,6 +63,11 @@ type Config struct {
RetryOnError bool RetryOnError bool
} }
// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
// resync or not. It can be used by a shared informer to support multiple event handlers with custom
// resync periods.
type ShouldResyncFunc func() bool
// ProcessFunc processes a single object. // ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error type ProcessFunc func(obj interface{}) error
@ -65,6 +76,7 @@ type controller struct {
config Config config Config
reflector *Reflector reflector *Reflector
reflectorMutex sync.RWMutex reflectorMutex sync.RWMutex
clock clock.Clock
} }
type Controller interface { type Controller interface {
@ -77,6 +89,7 @@ type Controller interface {
func New(c *Config) Controller { func New(c *Config) Controller {
ctlr := &controller{ ctlr := &controller{
config: *c, config: *c,
clock: &clock.RealClock{},
} }
return ctlr return ctlr
} }
@ -92,6 +105,8 @@ func (c *controller) Run(stopCh <-chan struct{}) {
c.config.Queue, c.config.Queue,
c.config.FullResyncPeriod, c.config.FullResyncPeriod,
) )
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock() c.reflectorMutex.Lock()
c.reflector = r c.reflector = r

View File

@ -26,7 +26,7 @@ import (
// TestPopReleaseLock tests that when processor listener blocks on chan, // TestPopReleaseLock tests that when processor listener blocks on chan,
// it should release the lock for pendingNotifications. // it should release the lock for pendingNotifications.
func TestPopReleaseLock(t *testing.T) { func TestPopReleaseLock(t *testing.T) {
pl := newProcessListener(nil) pl := newProcessListener(nil, 0, 0, time.Now())
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
// make pop() block on nextCh: waiting for receiver to get notification. // make pop() block on nextCh: waiting for receiver to get notification.

View File

@ -41,6 +41,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/clock"
) )
// Reflector watches a specified resource and causes all changes to be reflected in the given store. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
@ -58,8 +59,9 @@ type Reflector struct {
// the beginning of the next one. // the beginning of the next one.
period time.Duration period time.Duration
resyncPeriod time.Duration resyncPeriod time.Duration
// now() returns current time - exposed for testing purposes ShouldResync func() bool
now func() time.Time // clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last // lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store // observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store // it is thread safe, but not synchronized with the underlying store
@ -103,7 +105,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
expectedType: reflect.TypeOf(expectedType), expectedType: reflect.TypeOf(expectedType),
period: time.Second, period: time.Second,
resyncPeriod: resyncPeriod, resyncPeriod: resyncPeriod,
now: time.Now, clock: &clock.RealClock{},
} }
return r return r
} }
@ -223,8 +225,8 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// always fail so we end up listing frequently. Then, if we don't // always fail so we end up listing frequently. Then, if we don't
// manually stop the timer, we could end up with many timers active // manually stop the timer, we could end up with many timers active
// concurrently. // concurrently.
t := time.NewTimer(r.resyncPeriod) t := r.clock.NewTimer(r.resyncPeriod)
return t.C, t.Stop return t.C(), t.Stop
} }
// ListAndWatch first lists all items and get the resource version at the moment of call, // ListAndWatch first lists all items and get the resource version at the moment of call,
@ -270,11 +272,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
case <-cancelCh: case <-cancelCh:
return return
} }
if r.ShouldResync == nil || r.ShouldResync() {
glog.V(4).Infof("%s: forcing resync", r.name) glog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil { if err := r.store.Resync(); err != nil {
resyncerrc <- err resyncerrc <- err
return return
} }
}
cleanup() cleanup()
resyncCh, cleanup = r.resyncChan() resyncCh, cleanup = r.resyncChan()
} }
@ -334,7 +338,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
// watchHandler watches w and keeps *resourceVersion up to date. // watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := time.Now() start := r.clock.Now()
eventCount := 0 eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way // Stopping the watcher should be idempotent and if we return from this function there's no way
@ -393,7 +397,7 @@ loop:
} }
} }
watchDuration := time.Now().Sub(start) watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 { if watchDuration < 1*time.Second && eventCount == 0 {
glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name) glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
return errors.New("very short watch") return errors.New("very short watch")

View File

@ -24,28 +24,40 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/clock"
"github.com/golang/glog" "github.com/golang/glog"
) )
// if you use this, there is one behavior change compared to a standard Informer. // SharedInformer has a shared data cache and is capable of distributing notifications for changes
// When you receive a notification, the cache will be AT LEAST as fresh as the // to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
// notification, but it MAY be more fresh. You should NOT depend on the contents // one behavior change compared to a standard Informer. When you receive a notification, the cache
// of the cache exactly matching the notification you've received in handler // will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
// functions. If there was a create, followed by a delete, the cache may NOT // on the contents of the cache exactly matching the notification you've received in handler
// have your item. This has advantages over the broadcaster since it allows us // functions. If there was a create, followed by a delete, the cache may NOT have your item. This
// to share a common cache across many controllers. Extending the broadcaster // has advantages over the broadcaster since it allows us to share a common cache across many
// would have required us keep duplicate caches for each watch. // controllers. Extending the broadcaster would have required us keep duplicate caches for each
// watch.
type SharedInformer interface { type SharedInformer interface {
// events to a single handler are delivered sequentially, but there is no coordination between different handlers // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// You may NOT add a handler *after* the SharedInformer is running. That will result in an error being returned. // period. Events to a single handler are delivered sequentially, but there is no coordination
// TODO we should try to remove this restriction eventually. // between different handlers.
AddEventHandler(handler ResourceEventHandler) error AddEventHandler(handler ResourceEventHandler)
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore returns the Store.
GetStore() Store GetStore() Store
// GetController gives back a synthetic interface that "votes" to start the informer // GetController gives back a synthetic interface that "votes" to start the informer
GetController() Controller GetController() Controller
// Run starts the shared informer, which will be stopped when stopCh is closed.
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
// HasSynced returns true if the shared informer's store has synced.
HasSynced() bool HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string LastSyncResourceVersion() string
} }
@ -57,23 +69,22 @@ type SharedIndexInformer interface {
} }
// NewSharedInformer creates a new instance for the listwatcher. // NewSharedInformer creates a new instance for the listwatcher.
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
// be shared amongst all consumers.
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{}) return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
} }
// NewSharedIndexInformer creates a new instance for the listwatcher. // NewSharedIndexInformer creates a new instance for the listwatcher.
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
// be shared amongst all consumers. realClock := &clock.RealClock{}
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{ sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{}, processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw, listerWatcher: lw,
objectType: objType, objectType: objType,
fullResyncPeriod: resyncPeriod, resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
} }
return sharedIndexInformer return sharedIndexInformer
} }
@ -116,7 +127,16 @@ type sharedIndexInformer struct {
// This block is tracked to handle late initialization of the controller // This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher listerWatcher ListerWatcher
objectType runtime.Object objectType runtime.Object
fullResyncPeriod time.Duration
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started bool started bool
startedLock sync.Mutex startedLock sync.Mutex
@ -171,8 +191,9 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
Queue: fifo, Queue: fifo,
ListerWatcher: s.listerWatcher, ListerWatcher: s.listerWatcher,
ObjectType: s.objectType, ObjectType: s.objectType,
FullResyncPeriod: s.fullResyncPeriod, FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false, RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas, Process: s.HandleDeltas,
} }
@ -182,6 +203,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
s.controller = New(cfg) s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true s.started = true
}() }()
@ -240,14 +262,56 @@ func (s *sharedIndexInformer) GetController() Controller {
return &dummyController{informer: s} return &dummyController{informer: s}
} }
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error { func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func determineResyncPeriod(desired, check time.Duration) time.Duration {
if desired == 0 {
return desired
}
if check == 0 {
glog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
return 0
}
if desired < check {
glog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
return check
}
return desired
}
const minimumResyncPeriod = 1 * time.Second
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock() s.startedLock.Lock()
defer s.startedLock.Unlock() defer s.startedLock.Unlock()
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
glog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now())
if !s.started { if !s.started {
listener := newProcessListener(handler) s.processor.addListener(listener)
s.processor.listeners = append(s.processor.listeners, listener) return
return nil
} }
// in order to safely join, we have to // in order to safely join, we have to
@ -258,8 +322,7 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro
s.blockDeltas.Lock() s.blockDeltas.Lock()
defer s.blockDeltas.Unlock() defer s.blockDeltas.Unlock()
listener := newProcessListener(handler) s.processor.addListener(listener)
s.processor.listeners = append(s.processor.listeners, listener)
go listener.run(s.stopCh) go listener.run(s.stopCh)
go listener.pop(s.stopCh) go listener.pop(s.stopCh)
@ -268,8 +331,6 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro
for i := range items { for i := range items {
listener.add(addNotification{newObj: items[i]}) listener.add(addNotification{newObj: items[i]})
} }
return nil
} }
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
@ -280,45 +341,101 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
for _, d := range obj.(Deltas) { for _, d := range obj.(Deltas) {
switch d.Type { switch d.Type {
case Sync, Added, Updated: case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object) s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil { if err := s.indexer.Update(d.Object); err != nil {
return err return err
} }
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}) s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else { } else {
if err := s.indexer.Add(d.Object); err != nil { if err := s.indexer.Add(d.Object); err != nil {
return err return err
} }
s.processor.distribute(addNotification{newObj: d.Object}) s.processor.distribute(addNotification{newObj: d.Object}, isSync)
} }
case Deleted: case Deleted:
if err := s.indexer.Delete(d.Object); err != nil { if err := s.indexer.Delete(d.Object); err != nil {
return err return err
} }
s.processor.distribute(deleteNotification{oldObj: d.Object}) s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
} }
} }
return nil return nil
} }
type sharedProcessor struct { type sharedProcessor struct {
listenersLock sync.RWMutex
listeners []*processorListener listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
} }
func (p *sharedProcessor) distribute(obj interface{}) { func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners { for _, listener := range p.listeners {
listener.add(obj) listener.add(obj)
} }
} }
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func (p *sharedProcessor) run(stopCh <-chan struct{}) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners { for _, listener := range p.listeners {
go listener.run(stopCh) go listener.run(stopCh)
go listener.pop(stopCh) go listener.pop(stopCh)
} }
} }
// shouldResync queries every listener to determine if any of them need a resync, based on each
// listener's resyncPeriod.
func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.syncingListeners = []*processorListener{}
resyncNeeded := false
now := p.clock.Now()
for _, listener := range p.listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any
// listeners that are going to be resyncing.
if listener.shouldResync(now) {
resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded
}
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod)
}
}
type processorListener struct { type processorListener struct {
// lock/cond protects access to 'pendingNotifications'. // lock/cond protects access to 'pendingNotifications'.
lock sync.RWMutex lock sync.RWMutex
@ -334,16 +451,32 @@ type processorListener struct {
nextCh chan interface{} nextCh chan interface{}
handler ResourceEventHandler handler ResourceEventHandler
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
} }
func newProcessListener(handler ResourceEventHandler) *processorListener { func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
ret := &processorListener{ ret := &processorListener{
pendingNotifications: []interface{}{}, pendingNotifications: []interface{}{},
nextCh: make(chan interface{}), nextCh: make(chan interface{}),
handler: handler, handler: handler,
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
} }
ret.cond.L = &ret.lock ret.cond.L = &ret.lock
ret.determineNextResync(now)
return ret return ret
} }
@ -419,3 +552,30 @@ func (p *processorListener) run(stopCh <-chan struct{}) {
} }
} }
} }
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
// this always returns false.
func (p *processorListener) shouldResync(now time.Time) bool {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
if p.resyncPeriod == 0 {
return false
}
return now.After(p.nextResync) || now.Equal(p.nextResync)
}
func (p *processorListener) determineNextResync(now time.Time) {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
p.nextResync = now.Add(p.resyncPeriod)
}
func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
p.resyncPeriod = resyncPeriod
}

View File

@ -0,0 +1,253 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"sync"
"testing"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/pkg/api"
fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/client-go/util/clock"
)
type testListener struct {
lock sync.RWMutex
resyncPeriod time.Duration
expectedItemNames sets.String
receivedItemNames []string
name string
}
func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener {
l := &testListener{
resyncPeriod: resyncPeriod,
expectedItemNames: sets.NewString(expected...),
name: name,
}
return l
}
func (l *testListener) OnAdd(obj interface{}) {
l.handle(obj)
}
func (l *testListener) OnUpdate(old, new interface{}) {
l.handle(new)
}
func (l *testListener) OnDelete(obj interface{}) {
}
func (l *testListener) handle(obj interface{}) {
key, _ := MetaNamespaceKeyFunc(obj)
fmt.Printf("%s: handle: %v\n", l.name, key)
l.lock.Lock()
defer l.lock.Unlock()
objectMeta, _ := meta.Accessor(obj)
l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName())
}
func (l *testListener) ok() bool {
fmt.Println("polling")
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
if l.satisfiedExpectations() {
return true, nil
}
return false, nil
})
if err != nil {
return false
}
// wait just a bit to allow any unexpected stragglers to come in
fmt.Println("sleeping")
time.Sleep(1 * time.Second)
fmt.Println("final check")
return l.satisfiedExpectations()
}
func (l *testListener) satisfiedExpectations() bool {
l.lock.RLock()
defer l.lock.RUnlock()
return len(l.receivedItemNames) == l.expectedItemNames.Len() && sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
}
func TestListenerResyncPeriods(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source.Add(&api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
source.Add(&api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &api.Pod{}, 1*time.Second).(*sharedIndexInformer)
clock := clock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock
// listener 1, never resync
listener1 := newTestListener("listener1", 0, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
// listener 2, resync every 2s
listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
// listener 3, resync every 3s
listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
listeners := []*testListener{listener1, listener2, listener3}
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop)
// ensure all listeners got the initial List
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
// reset
for _, listener := range listeners {
listener.receivedItemNames = []string{}
}
// advance so listener2 gets a resync
clock.Step(2 * time.Second)
// make sure listener2 got the resync
if !listener2.ok() {
t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames)
}
// wait a bit to give errant items a chance to go to 1 and 3
time.Sleep(1 * time.Second)
// make sure listeners 1 and 3 got nothing
if len(listener1.receivedItemNames) != 0 {
t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
}
if len(listener3.receivedItemNames) != 0 {
t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames))
}
// reset
for _, listener := range listeners {
listener.receivedItemNames = []string{}
}
// advance so listener3 gets a resync
clock.Step(1 * time.Second)
// make sure listener3 got the resync
if !listener3.ok() {
t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames)
}
// wait a bit to give errant items a chance to go to 1 and 2
time.Sleep(1 * time.Second)
// make sure listeners 1 and 2 got nothing
if len(listener1.receivedItemNames) != 0 {
t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
}
if len(listener2.receivedItemNames) != 0 {
t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames))
}
}
func TestResyncCheckPeriod(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &api.Pod{}, 12*time.Hour).(*sharedIndexInformer)
clock := clock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock
// listener 1, never resync
listener1 := newTestListener("listener1", 0)
informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
// listener 2, resync every minute
listener2 := newTestListener("listener2", 1*time.Minute)
informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
// listener 3, resync every 55 seconds
listener3 := newTestListener("listener3", 55*time.Second)
informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
// listener 4, resync every 5 seconds
listener4 := newTestListener("listener4", 5*time.Second)
informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod)
if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
if e, a := 5*time.Second, informer.processor.listeners[3].resyncPeriod; e != a {
t.Errorf("expected %d, got %d", e, a)
}
}

2
vendor/BUILD vendored
View File

@ -12403,6 +12403,7 @@ go_test(
"k8s.io/client-go/tools/cache/mutation_detector_test.go", "k8s.io/client-go/tools/cache/mutation_detector_test.go",
"k8s.io/client-go/tools/cache/processor_listener_test.go", "k8s.io/client-go/tools/cache/processor_listener_test.go",
"k8s.io/client-go/tools/cache/reflector_test.go", "k8s.io/client-go/tools/cache/reflector_test.go",
"k8s.io/client-go/tools/cache/shared_informer_test.go",
"k8s.io/client-go/tools/cache/store_test.go", "k8s.io/client-go/tools/cache/store_test.go",
"k8s.io/client-go/tools/cache/undelta_store_test.go", "k8s.io/client-go/tools/cache/undelta_store_test.go",
], ],
@ -12410,6 +12411,7 @@ go_test(
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//vendor:github.com/google/gofuzz", "//vendor:github.com/google/gofuzz",
"//vendor:k8s.io/apimachinery/pkg/api/meta",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/sets",