From 5d70a118df80fd8ddb8fbd041af95acae49ae617 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 18 Nov 2022 00:12:50 +0000 Subject: [PATCH] Enable propagration of HasSynced * Add tracker types and tests * Modify ResourceEventHandler interface's OnAdd member * Add additional ResourceEventHandlerDetailedFuncs struct * Fix SharedInformer to let users track HasSynced for their handlers * Fix in-tree controllers which weren't computing HasSynced correctly * Deprecate the cache.Pop function Kubernetes-commit: 8100efc7b3122ad119ee8fa4bbbedef3b90f2e0d --- tools/cache/controller.go | 52 +++++- tools/cache/controller_test.go | 18 +- tools/cache/delta_fifo.go | 7 +- tools/cache/delta_fifo_test.go | 25 ++- tools/cache/fifo.go | 14 +- tools/cache/fifo_test.go | 8 +- tools/cache/processor_listener_test.go | 2 +- tools/cache/shared_informer.go | 68 +++++-- tools/cache/shared_informer_test.go | 27 ++- tools/cache/synctrack/synctrack.go | 116 ++++++++++++ tools/cache/synctrack/synctrack_test.go | 239 ++++++++++++++++++++++++ 11 files changed, 524 insertions(+), 52 deletions(-) create mode 100644 tools/cache/synctrack/synctrack.go create mode 100644 tools/cache/synctrack/synctrack_test.go diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 57b15fea..31b94ea2 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -85,7 +85,7 @@ type Config struct { type ShouldResyncFunc func() bool // ProcessFunc processes a single object. -type ProcessFunc func(obj interface{}) error +type ProcessFunc func(obj interface{}, isInInitialList bool) error // `*controller` implements Controller type controller struct { @@ -215,7 +215,7 @@ func (c *controller) processLoop() { // happen if the watch is closed and misses the delete event and we don't // notice the deletion until the subsequent re-list. type ResourceEventHandler interface { - OnAdd(obj interface{}) + OnAdd(obj interface{}, isInInitialList bool) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) } @@ -224,6 +224,9 @@ type ResourceEventHandler interface { // as few of the notification functions as you want while still implementing // ResourceEventHandler. This adapter does not remove the prohibition against // modifying the objects. +// +// See ResourceEventHandlerDetailedFuncs if your use needs to propagate +// HasSynced. type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) @@ -231,7 +234,7 @@ type ResourceEventHandlerFuncs struct { } // OnAdd calls AddFunc if it's not nil. -func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { +func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) { if r.AddFunc != nil { r.AddFunc(obj) } @@ -251,6 +254,36 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { } } +// ResourceEventHandlerDetailedFuncs is exactly like ResourceEventHandlerFuncs +// except its AddFunc accepts the isInInitialList parameter, for propagating +// HasSynced. +type ResourceEventHandlerDetailedFuncs struct { + AddFunc func(obj interface{}, isInInitialList bool) + UpdateFunc func(oldObj, newObj interface{}) + DeleteFunc func(obj interface{}) +} + +// OnAdd calls AddFunc if it's not nil. +func (r ResourceEventHandlerDetailedFuncs) OnAdd(obj interface{}, isInInitialList bool) { + if r.AddFunc != nil { + r.AddFunc(obj, isInInitialList) + } +} + +// OnUpdate calls UpdateFunc if it's not nil. +func (r ResourceEventHandlerDetailedFuncs) OnUpdate(oldObj, newObj interface{}) { + if r.UpdateFunc != nil { + r.UpdateFunc(oldObj, newObj) + } +} + +// OnDelete calls DeleteFunc if it's not nil. +func (r ResourceEventHandlerDetailedFuncs) OnDelete(obj interface{}) { + if r.DeleteFunc != nil { + r.DeleteFunc(obj) + } +} + // FilteringResourceEventHandler applies the provided filter to all events coming // in, ensuring the appropriate nested handler method is invoked. An object // that starts passing the filter after an update is considered an add, and an @@ -262,11 +295,11 @@ type FilteringResourceEventHandler struct { } // OnAdd calls the nested handler only if the filter succeeds -func (r FilteringResourceEventHandler) OnAdd(obj interface{}) { +func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) { if !r.FilterFunc(obj) { return } - r.Handler.OnAdd(obj) + r.Handler.OnAdd(obj, isInInitialList) } // OnUpdate ensures the proper handler is called depending on whether the filter matches @@ -277,7 +310,7 @@ func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) { case newer && older: r.Handler.OnUpdate(oldObj, newObj) case newer && !older: - r.Handler.OnAdd(newObj) + r.Handler.OnAdd(newObj, false) case !newer && older: r.Handler.OnDelete(oldObj) default: @@ -417,6 +450,7 @@ func processDeltas( clientState Store, transformer TransformFunc, deltas Deltas, + isInInitialList bool, ) error { // from oldest to newest for _, d := range deltas { @@ -440,7 +474,7 @@ func processDeltas( if err := clientState.Add(obj); err != nil { return err } - handler.OnAdd(obj) + handler.OnAdd(obj, isInInitialList) } case Deleted: if err := clientState.Delete(obj); err != nil { @@ -488,9 +522,9 @@ func newInformer( FullResyncPeriod: resyncPeriod, RetryOnError: false, - Process: func(obj interface{}) error { + Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(h, clientState, transformer, deltas) + return processDeltas(h, clientState, transformer, deltas, isInInitialList) } return errors.New("object given as Process argument is not Deltas") }, diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index cf42478e..e59a5c96 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -62,7 +62,7 @@ func Example() { // Let's implement a simple controller that just deletes // everything that comes in. - Process: func(obj interface{}) error { + Process: func(obj interface{}, isInInitialList bool) error { // Obj is from the Pop method of the Queue we make above. newest := obj.(Deltas).Newest() @@ -137,8 +137,8 @@ func ExampleNewInformer() { source, &v1.Pod{}, time.Millisecond*100, - ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { source.Delete(obj.(runtime.Object)) }, DeleteFunc: func(obj interface{}) { @@ -213,8 +213,8 @@ func TestHammerController(t *testing.T) { source, &v1.Pod{}, time.Millisecond*100, - ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { recordFunc("add", obj) }, + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { recordFunc("add", obj) }, UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) }, DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) }, }, @@ -416,8 +416,8 @@ func TestPanicPropagated(t *testing.T) { source, &v1.Pod{}, time.Millisecond*100, - ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { // Create a panic. panic("Just panic.") }, @@ -526,8 +526,8 @@ func TestTransformingInformer(t *testing.T) { source, &v1.Pod{}, 0, - ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { recordEvent(watch.Added, nil, obj) }, + ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) }, DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) }, }, diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 0c13a41f..c4f2de7b 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -271,6 +271,10 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { func (f *DeltaFIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() + return f.hasSynced_locked() +} + +func (f *DeltaFIFO) hasSynced_locked() bool { return f.populated && f.initialPopulationCount == 0 } @@ -526,6 +530,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.cond.Wait() } + isInInitialList := !f.hasSynced_locked() id := f.queue[0] f.queue = f.queue[1:] depth := len(f.queue) @@ -551,7 +556,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) defer trace.LogIfLong(100 * time.Millisecond) } - err := process(item) + err := process(item, isInInitialList) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index f17240da..902fcaed 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -125,7 +125,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}) error { + _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -138,7 +138,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) { t.Fatalf("object should have been requeued: %t %v", ok, err) } - _, err = f.Pop(func(obj interface{}) error { + _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -151,7 +151,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) { t.Fatalf("object should have been requeued: %t %v", ok, err) } - _, err = f.Pop(func(obj interface{}) error { + _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -480,6 +480,18 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { } } +// pop2 captures both parameters, unlike Pop(). +func pop2[T any](queue Queue) (T, bool) { + var result interface{} + var isList bool + queue.Pop(func(obj interface{}, isInInitialList bool) error { + result = obj + isList = isInInitialList + return nil + }) + return result.(T), isList +} + func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, @@ -501,10 +513,13 @@ func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { if f.HasSynced() { t.Errorf("Expected HasSynced to be false") } - cur := Pop(f).(Deltas) + cur, initial := pop2[Deltas](f) if e, a := expected, cur; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } + if initial != true { + t.Error("Expected initial list item") + } } if !f.HasSynced() { t.Errorf("Expected HasSynced to be true") @@ -676,7 +691,7 @@ func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(func(obj interface{}) error { + f.Pop(func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 8f331378..dd13c4ea 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -25,7 +25,7 @@ import ( // PopProcessFunc is passed to Pop() method of Queue interface. // It is supposed to process the accumulator popped from the queue. -type PopProcessFunc func(interface{}) error +type PopProcessFunc func(obj interface{}, isInInitialList bool) error // ErrRequeue may be returned by a PopProcessFunc to safely requeue // the current item. The value of Err will be returned from Pop. @@ -82,9 +82,12 @@ type Queue interface { // Pop is helper function for popping from Queue. // WARNING: Do NOT use this function in non-test code to avoid races // unless you really really really really know what you are doing. +// +// NOTE: This function is deprecated and may be removed in the future without +// additional warning. func Pop(queue Queue) interface{} { var result interface{} - queue.Pop(func(obj interface{}) error { + queue.Pop(func(obj interface{}, isInInitialList bool) error { result = obj return nil }) @@ -149,6 +152,10 @@ func (f *FIFO) Close() { func (f *FIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() + return f.hasSynced_locked() +} + +func (f *FIFO) hasSynced_locked() bool { return f.populated && f.initialPopulationCount == 0 } @@ -287,6 +294,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.cond.Wait() } + isInInitialList := !f.hasSynced_locked() id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { @@ -298,7 +306,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { continue } delete(f.items, id) - err := process(item) + err := process(item, isInInitialList) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index 16b8502f..655f1378 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -76,7 +76,7 @@ func TestFIFO_requeueOnPop(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}) error { + _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -89,7 +89,7 @@ func TestFIFO_requeueOnPop(t *testing.T) { t.Fatalf("object should have been requeued: %t %v", ok, err) } - _, err = f.Pop(func(obj interface{}) error { + _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -102,7 +102,7 @@ func TestFIFO_requeueOnPop(t *testing.T) { t.Fatalf("object should have been requeued: %t %v", ok, err) } - _, err = f.Pop(func(obj interface{}) error { + _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { if obj.(testFifoObject).name != "foo" { t.Fatalf("unexpected object: %#v", obj) } @@ -289,7 +289,7 @@ func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) { const jobs = 10 for i := 0; i < jobs; i++ { go func() { - f.Pop(func(obj interface{}) error { + f.Pop(func(obj interface{}, isInInitialList bool) error { return nil }) c <- struct{}{} diff --git a/tools/cache/processor_listener_test.go b/tools/cache/processor_listener_test.go index 1da73420..fd658197 100644 --- a/tools/cache/processor_listener_test.go +++ b/tools/cache/processor_listener_test.go @@ -39,7 +39,7 @@ func BenchmarkListener(b *testing.B) { AddFunc: func(obj interface{}) { swg.Done() }, - }, 0, 0, time.Now(), 1024*1024) + }, 0, 0, time.Now(), 1024*1024, func() bool { return true }) var wg wait.Group defer wg.Wait() // Wait for .run and .pop to stop defer close(pl.addCh) // Tell .run and .pop to stop diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 0e39f2a7..2717bde5 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache/synctrack" "k8s.io/utils/buffer" "k8s.io/utils/clock" @@ -132,11 +133,13 @@ import ( // state, except that its ResourceVersion is replaced with a // ResourceVersion in which the object is actually absent. type SharedInformer interface { - // AddEventHandler adds an event handler to the shared informer using the shared informer's resync - // period. Events to a single handler are delivered sequentially, but there is no coordination - // between different handlers. - // It returns a registration handle for the handler that can be used to remove - // the handler again. + // AddEventHandler adds an event handler to the shared informer using + // the shared informer's resync period. Events to a single handler are + // delivered sequentially, but there is no coordination between + // different handlers. + // It returns a registration handle for the handler that can be used to + // remove the handler again, or to tell if the handler is synced (has + // seen every item in the initial list). AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means @@ -169,6 +172,10 @@ type SharedInformer interface { // HasSynced returns true if the shared informer's store has been // informed by at least one full LIST of the authoritative state // of the informer's object collection. This is unrelated to "resync". + // + // Note that this doesn't tell you if an individual handler is synced!! + // For that, please call HasSynced on the handle returned by + // AddEventHandler. HasSynced() bool // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not @@ -213,7 +220,14 @@ type SharedInformer interface { // Opaque interface representing the registration of ResourceEventHandler for // a SharedInformer. Must be supplied back to the same SharedInformer's // `RemoveEventHandler` to unregister the handlers. -type ResourceEventHandlerRegistration interface{} +// +// Also used to tell if the handler is synced (has had all items in the initial +// list delivered). +type ResourceEventHandlerRegistration interface { + // HasSynced reports if both the parent has synced and all pre-sync + // events have been delivered. + HasSynced() bool +} // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. type SharedIndexInformer interface { @@ -409,7 +423,8 @@ type updateNotification struct { } type addNotification struct { - newObj interface{} + newObj interface{} + isInInitialList bool } type deleteNotification struct { @@ -588,7 +603,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv } } - listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) + listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) if !s.started { return s.processor.addListener(listener), nil @@ -604,27 +619,35 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv handle := s.processor.addListener(listener) for _, item := range s.indexer.List() { - listener.add(addNotification{newObj: item}) + // Note that we enqueue these notifications with the lock held + // and before returning the handle. That means there is never a + // chance for anyone to call the handle's HasSynced method in a + // state when it would falsely return true (i.e., when the + // shared informer is synced but it has not observed an Add + // with isInitialList being true, nor when the thread + // processing notifications somehow goes faster than this + // thread adding them and the counter is temporarily zero). + listener.add(addNotification{newObj: item, isInInitialList: true}) } return handle, nil } -func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { +func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, s.transform, deltas) + return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList) } return errors.New("object given as Process argument is not Deltas") } // Conforms to ResourceEventHandler -func (s *sharedIndexInformer) OnAdd(obj interface{}) { +func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) { // Invocation of this function is locked under s.blockDeltas, so it is // save to distribute the notification s.cacheMutationDetector.AddObject(obj) - s.processor.distribute(addNotification{newObj: obj}, false) + s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false) } // Conforms to ResourceEventHandler @@ -846,6 +869,8 @@ type processorListener struct { handler ResourceEventHandler + syncTracker *synctrack.SingleFileTracker + // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications // added until we OOM. @@ -876,11 +901,18 @@ type processorListener struct { resyncLock sync.Mutex } -func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener { +// HasSynced returns true if the source informer has synced, and all +// corresponding events have been delivered. +func (p *processorListener) HasSynced() bool { + return p.syncTracker.HasSynced() +} + +func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { ret := &processorListener{ nextCh: make(chan interface{}), addCh: make(chan interface{}), handler: handler, + syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced}, pendingNotifications: *buffer.NewRingGrowing(bufferSize), requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, @@ -892,6 +924,9 @@ func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, res } func (p *processorListener) add(notification interface{}) { + if a, ok := notification.(addNotification); ok && a.isInInitialList { + p.syncTracker.Start() + } p.addCh <- notification } @@ -937,7 +972,10 @@ func (p *processorListener) run() { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: - p.handler.OnAdd(notification.newObj) + p.handler.OnAdd(notification.newObj, notification.isInInitialList) + if notification.isInInitialList { + p.syncTracker.Finished() + } case deleteNotification: p.handler.OnDelete(notification.oldObj) default: diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 2676e8f5..71f154b0 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -52,7 +52,7 @@ func newTestListener(name string, resyncPeriod time.Duration, expected ...string return l } -func (l *testListener) OnAdd(obj interface{}) { +func (l *testListener) OnAdd(obj interface{}, isInInitialList bool) { l.handle(obj) } @@ -68,7 +68,6 @@ func (l *testListener) handle(obj interface{}) { fmt.Printf("%s: handle: %v\n", l.name, key) l.lock.Lock() defer l.lock.Unlock() - objectMeta, _ := meta.Accessor(obj) l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName()) } @@ -649,8 +648,8 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { worker := func() { // Keep adding and removing handler // Make sure no duplicate events? - funcs := ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) {}, + funcs := ResourceEventHandlerDetailedFuncs{ + AddFunc: func(obj interface{}, isInInitialList bool) {}, UpdateFunc: func(oldObj, newObj interface{}) {}, DeleteFunc: func(obj interface{}) {}, } @@ -902,9 +901,13 @@ func TestAddWhileActive(t *testing.T) { // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) listener1 := newTestListener("originalListener", 0, "pod1") - listener2 := newTestListener("originalListener", 0, "pod1", "pod2") + listener2 := newTestListener("listener2", 0, "pod1", "pod2") handle1, _ := informer.AddEventHandler(listener1) + if handle1.HasSynced() { + t.Error("Synced before Run??") + } + stop := make(chan struct{}) defer close(stop) @@ -916,7 +919,17 @@ func TestAddWhileActive(t *testing.T) { return } + if !handle1.HasSynced() { + t.Error("Not synced after Run??") + } + + listener2.lock.Lock() // ensure we observe it before it has synced handle2, _ := informer.AddEventHandler(listener2) + if handle2.HasSynced() { + t.Error("Synced before processing anything?") + } + listener2.lock.Unlock() // permit it to proceed and sync + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) if !listener2.ok() { @@ -924,6 +937,10 @@ func TestAddWhileActive(t *testing.T) { return } + if !handle2.HasSynced() { + t.Error("Not synced even after processing?") + } + if !isRegistered(informer, handle1) { t.Errorf("handle1 is not active") return diff --git a/tools/cache/synctrack/synctrack.go b/tools/cache/synctrack/synctrack.go new file mode 100644 index 00000000..c488b497 --- /dev/null +++ b/tools/cache/synctrack/synctrack.go @@ -0,0 +1,116 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package synctrack contains utilities for helping controllers track whether +// they are "synced" or not, that is, whether they have processed all items +// from the informer's initial list. +package synctrack + +import ( + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/util/sets" +) + +// AsyncTracker helps propagate HasSynced in the face of multiple worker threads. +type AsyncTracker[T comparable] struct { + UpstreamHasSynced func() bool + + lock sync.Mutex + waiting sets.Set[T] +} + +// Start should be called prior to processing each key which is part of the +// initial list. +func (t *AsyncTracker[T]) Start(key T) { + t.lock.Lock() + defer t.lock.Unlock() + if t.waiting == nil { + t.waiting = sets.New[T](key) + } else { + t.waiting.Insert(key) + } +} + +// Finished should be called when finished processing a key which was part of +// the initial list. Since keys are tracked individually, nothing bad happens +// if you call Finished without a corresponding call to Start. This makes it +// easier to use this in combination with e.g. queues which don't make it easy +// to plumb through the isInInitialList boolean. +func (t *AsyncTracker[T]) Finished(key T) { + t.lock.Lock() + defer t.lock.Unlock() + if t.waiting != nil { + t.waiting.Delete(key) + } +} + +// HasSynced returns true if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *AsyncTracker[T]) HasSynced() bool { + // Call UpstreamHasSynced first: it might take a lock, which might take + // a significant amount of time, and we can't hold our lock while + // waiting on that or a user is likely to get a deadlock. + if !t.UpstreamHasSynced() { + return false + } + t.lock.Lock() + defer t.lock.Unlock() + return t.waiting.Len() == 0 +} + +// SingleFileTracker helps propagate HasSynced when events are processed in +// order (i.e. via a queue). +type SingleFileTracker struct { + UpstreamHasSynced func() bool + + count int64 +} + +// Start should be called prior to processing each key which is part of the +// initial list. +func (t *SingleFileTracker) Start() { + atomic.AddInt64(&t.count, 1) +} + +// Finished should be called when finished processing a key which was part of +// the initial list. You must never call Finished() before (or without) its +// corresponding Start(), that is a logic error that could cause HasSynced to +// return a wrong value. To help you notice this should it happen, Finished() +// will panic if the internal counter goes negative. +func (t *SingleFileTracker) Finished() { + result := atomic.AddInt64(&t.count, -1) + if result < 0 { + panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value") + } +} + +// HasSynced returns true if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *SingleFileTracker) HasSynced() bool { + // Call UpstreamHasSynced first: it might take a lock, which might take + // a significant amount of time, and we don't want to then act on a + // stale count value. + if !t.UpstreamHasSynced() { + return false + } + return atomic.LoadInt64(&t.count) <= 0 +} diff --git a/tools/cache/synctrack/synctrack_test.go b/tools/cache/synctrack/synctrack_test.go new file mode 100644 index 00000000..4cf089e2 --- /dev/null +++ b/tools/cache/synctrack/synctrack_test.go @@ -0,0 +1,239 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package synctrack + +import ( + "strings" + "sync" + "time" + + "testing" +) + +func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { + tracker := SingleFileTracker{ + UpstreamHasSynced: upstreamHasSynced, + } + return tracker.Start, tracker.Finished, tracker.HasSynced +} + +func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { + tracker := AsyncTracker[string]{ + UpstreamHasSynced: upstreamHasSynced, + } + return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced +} + +func TestBasicLogic(t *testing.T) { + table := []struct { + name string + construct func(func() bool) (func(), func(), func() bool) + }{ + {"SingleFile", testSingleFileFuncs}, + {"Async", testAsyncFuncs}, + } + + for _, entry := range table { + t.Run(entry.name, func(t *testing.T) { + table := []struct { + synced bool + start bool + finish bool + expectSynced bool + }{ + {false, true, true, false}, + {true, true, false, false}, + {false, true, false, false}, + {true, true, true, true}, + } + for _, tt := range table { + Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced }) + if tt.start { + Start() + } + if tt.finish { + Finished() + } + got := HasSynced() + if e, a := tt.expectSynced, got; e != a { + t.Errorf("for %#v got %v (wanted %v)", tt, a, e) + } + } + }) + } +} + +func TestAsyncLocking(t *testing.T) { + aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }} + + var wg sync.WaitGroup + for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { + wg.Add(1) + go func(i int) { + aft.Start(i) + go func() { + aft.Finished(i) + wg.Done() + }() + }(i) + } + wg.Wait() + if !aft.HasSynced() { + t.Errorf("async tracker must have made a threading error?") + } + +} + +func TestSingleFileCounting(t *testing.T) { + sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }} + + for i := 0; i < 100; i++ { + sft.Start() + } + if sft.HasSynced() { + t.Fatal("Unexpectedly synced?") + } + for i := 0; i < 99; i++ { + sft.Finished() + } + if sft.HasSynced() { + t.Fatal("Unexpectedly synced?") + } + + sft.Finished() + if !sft.HasSynced() { + t.Fatal("Unexpectedly not synced?") + } + + // Calling an extra time will panic. + func() { + defer func() { + x := recover() + if x == nil { + t.Error("no panic?") + return + } + msg, ok := x.(string) + if !ok { + t.Errorf("unexpected panic value: %v", x) + return + } + if !strings.Contains(msg, "negative counter") { + t.Errorf("unexpected panic message: %v", msg) + return + } + }() + sft.Finished() + }() + + // Negative counter still means it is synced + if !sft.HasSynced() { + t.Fatal("Unexpectedly not synced?") + } +} + +func TestSingleFile(t *testing.T) { + table := []struct { + synced bool + starts int + stops int + expectSynced bool + }{ + {false, 1, 1, false}, + {true, 1, 0, false}, + {false, 1, 0, false}, + {true, 1, 1, true}, + } + for _, tt := range table { + sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }} + for i := 0; i < tt.starts; i++ { + sft.Start() + } + for i := 0; i < tt.stops; i++ { + sft.Finished() + } + got := sft.HasSynced() + if e, a := tt.expectSynced, got; e != a { + t.Errorf("for %#v got %v (wanted %v)", tt, a, e) + } + } + +} + +func TestNoStaleValue(t *testing.T) { + table := []struct { + name string + construct func(func() bool) (func(), func(), func() bool) + }{ + {"SingleFile", testSingleFileFuncs}, + {"Async", testAsyncFuncs}, + } + + for _, entry := range table { + t.Run(entry.name, func(t *testing.T) { + var lock sync.Mutex + upstreamHasSynced := func() bool { + lock.Lock() + defer lock.Unlock() + return true + } + + Start, Finished, HasSynced := entry.construct(upstreamHasSynced) + + // Ordinarily the corresponding lock would be held and you wouldn't be + // able to call this function at this point. + if !HasSynced() { + t.Fatal("Unexpectedly not synced??") + } + + Start() + if HasSynced() { + t.Fatal("Unexpectedly synced??") + } + Finished() + if !HasSynced() { + t.Fatal("Unexpectedly not synced??") + } + + // Now we will prove that if the lock is held, you can't get a false + // HasSynced return. + lock.Lock() + + // This goroutine calls HasSynced + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if HasSynced() { + t.Error("Unexpectedly synced??") + } + }() + + // This goroutine increments + unlocks. The sleep is to bias the + // runtime such that the other goroutine usually wins (it needs to work + // in both orderings, this one is more likely to be buggy). + go func() { + time.Sleep(time.Millisecond) + Start() + lock.Unlock() + }() + + wg.Wait() + }) + } + +}