Enable propagration of HasSynced

* Add tracker types and tests
* Modify ResourceEventHandler interface's OnAdd member
* Add additional ResourceEventHandlerDetailedFuncs struct
* Fix SharedInformer to let users track HasSynced for their handlers
* Fix in-tree controllers which weren't computing HasSynced correctly
* Deprecate the cache.Pop function

Kubernetes-commit: 8100efc7b3122ad119ee8fa4bbbedef3b90f2e0d
This commit is contained in:
Daniel Smith 2022-11-18 00:12:50 +00:00 committed by Kubernetes Publisher
parent e7e7d01afd
commit 5d70a118df
11 changed files with 524 additions and 52 deletions

View File

@ -85,7 +85,7 @@ type Config struct {
type ShouldResyncFunc func() bool
// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error
type ProcessFunc func(obj interface{}, isInInitialList bool) error
// `*controller` implements Controller
type controller struct {
@ -215,7 +215,7 @@ func (c *controller) processLoop() {
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnAdd(obj interface{}, isInInitialList bool)
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
@ -224,6 +224,9 @@ type ResourceEventHandler interface {
// as few of the notification functions as you want while still implementing
// ResourceEventHandler. This adapter does not remove the prohibition against
// modifying the objects.
//
// See ResourceEventHandlerDetailedFuncs if your use needs to propagate
// HasSynced.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
@ -231,7 +234,7 @@ type ResourceEventHandlerFuncs struct {
}
// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
@ -251,6 +254,36 @@ func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
}
}
// ResourceEventHandlerDetailedFuncs is exactly like ResourceEventHandlerFuncs
// except its AddFunc accepts the isInInitialList parameter, for propagating
// HasSynced.
type ResourceEventHandlerDetailedFuncs struct {
AddFunc func(obj interface{}, isInInitialList bool)
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
// OnAdd calls AddFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnAdd(obj interface{}, isInInitialList bool) {
if r.AddFunc != nil {
r.AddFunc(obj, isInInitialList)
}
}
// OnUpdate calls UpdateFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
// OnDelete calls DeleteFunc if it's not nil.
func (r ResourceEventHandlerDetailedFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}
// FilteringResourceEventHandler applies the provided filter to all events coming
// in, ensuring the appropriate nested handler method is invoked. An object
// that starts passing the filter after an update is considered an add, and an
@ -262,11 +295,11 @@ type FilteringResourceEventHandler struct {
}
// OnAdd calls the nested handler only if the filter succeeds
func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
if !r.FilterFunc(obj) {
return
}
r.Handler.OnAdd(obj)
r.Handler.OnAdd(obj, isInInitialList)
}
// OnUpdate ensures the proper handler is called depending on whether the filter matches
@ -277,7 +310,7 @@ func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
case newer && older:
r.Handler.OnUpdate(oldObj, newObj)
case newer && !older:
r.Handler.OnAdd(newObj)
r.Handler.OnAdd(newObj, false)
case !newer && older:
r.Handler.OnDelete(oldObj)
default:
@ -417,6 +450,7 @@ func processDeltas(
clientState Store,
transformer TransformFunc,
deltas Deltas,
isInInitialList bool,
) error {
// from oldest to newest
for _, d := range deltas {
@ -440,7 +474,7 @@ func processDeltas(
if err := clientState.Add(obj); err != nil {
return err
}
handler.OnAdd(obj)
handler.OnAdd(obj, isInInitialList)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
@ -488,9 +522,9 @@ func newInformer(
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(h, clientState, transformer, deltas)
return processDeltas(h, clientState, transformer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
},

View File

@ -62,7 +62,7 @@ func Example() {
// Let's implement a simple controller that just deletes
// everything that comes in.
Process: func(obj interface{}) error {
Process: func(obj interface{}, isInInitialList bool) error {
// Obj is from the Pop method of the Queue we make above.
newest := obj.(Deltas).Newest()
@ -137,8 +137,8 @@ func ExampleNewInformer() {
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) {
source.Delete(obj.(runtime.Object))
},
DeleteFunc: func(obj interface{}) {
@ -213,8 +213,8 @@ func TestHammerController(t *testing.T) {
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { recordFunc("add", obj) },
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) { recordFunc("add", obj) },
UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) },
DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) },
},
@ -416,8 +416,8 @@ func TestPanicPropagated(t *testing.T) {
source,
&v1.Pod{},
time.Millisecond*100,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) {
// Create a panic.
panic("Just panic.")
},
@ -526,8 +526,8 @@ func TestTransformingInformer(t *testing.T) {
source,
&v1.Pod{},
0,
ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { recordEvent(watch.Added, nil, obj) },
ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
},

View File

@ -271,6 +271,10 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
func (f *DeltaFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
@ -526,6 +530,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
@ -551,7 +556,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item)
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err

View File

@ -125,7 +125,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
f.Add(mkFifoObj("foo", 10))
_, err := f.Pop(func(obj interface{}) error {
_, err := f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -138,7 +138,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) {
t.Fatalf("object should have been requeued: %t %v", ok, err)
}
_, err = f.Pop(func(obj interface{}) error {
_, err = f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -151,7 +151,7 @@ func TestDeltaFIFO_requeueOnPop(t *testing.T) {
t.Fatalf("object should have been requeued: %t %v", ok, err)
}
_, err = f.Pop(func(obj interface{}) error {
_, err = f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -480,6 +480,18 @@ func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
}
}
// pop2 captures both parameters, unlike Pop().
func pop2[T any](queue Queue) (T, bool) {
var result interface{}
var isList bool
queue.Pop(func(obj interface{}, isInInitialList bool) error {
result = obj
isList = isInInitialList
return nil
})
return result.(T), isList
}
func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
@ -501,10 +513,13 @@ func TestDeltaFIFO_HasSyncedCorrectOnDeletion(t *testing.T) {
if f.HasSynced() {
t.Errorf("Expected HasSynced to be false")
}
cur := Pop(f).(Deltas)
cur, initial := pop2[Deltas](f)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
if initial != true {
t.Error("Expected initial list item")
}
}
if !f.HasSynced() {
t.Errorf("Expected HasSynced to be true")
@ -676,7 +691,7 @@ func TestDeltaFIFO_PopShouldUnblockWhenClosed(t *testing.T) {
const jobs = 10
for i := 0; i < jobs; i++ {
go func() {
f.Pop(func(obj interface{}) error {
f.Pop(func(obj interface{}, isInInitialList bool) error {
return nil
})
c <- struct{}{}

14
tools/cache/fifo.go vendored
View File

@ -25,7 +25,7 @@ import (
// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error
type PopProcessFunc func(obj interface{}, isInInitialList bool) error
// ErrRequeue may be returned by a PopProcessFunc to safely requeue
// the current item. The value of Err will be returned from Pop.
@ -82,9 +82,12 @@ type Queue interface {
// Pop is helper function for popping from Queue.
// WARNING: Do NOT use this function in non-test code to avoid races
// unless you really really really really know what you are doing.
//
// NOTE: This function is deprecated and may be removed in the future without
// additional warning.
func Pop(queue Queue) interface{} {
var result interface{}
queue.Pop(func(obj interface{}) error {
queue.Pop(func(obj interface{}, isInInitialList bool) error {
result = obj
return nil
})
@ -149,6 +152,10 @@ func (f *FIFO) Close() {
func (f *FIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
func (f *FIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
@ -287,6 +294,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
@ -298,7 +306,7 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
continue
}
delete(f.items, id)
err := process(item)
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err

View File

@ -76,7 +76,7 @@ func TestFIFO_requeueOnPop(t *testing.T) {
f := NewFIFO(testFifoObjectKeyFunc)
f.Add(mkFifoObj("foo", 10))
_, err := f.Pop(func(obj interface{}) error {
_, err := f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -89,7 +89,7 @@ func TestFIFO_requeueOnPop(t *testing.T) {
t.Fatalf("object should have been requeued: %t %v", ok, err)
}
_, err = f.Pop(func(obj interface{}) error {
_, err = f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -102,7 +102,7 @@ func TestFIFO_requeueOnPop(t *testing.T) {
t.Fatalf("object should have been requeued: %t %v", ok, err)
}
_, err = f.Pop(func(obj interface{}) error {
_, err = f.Pop(func(obj interface{}, isInInitialList bool) error {
if obj.(testFifoObject).name != "foo" {
t.Fatalf("unexpected object: %#v", obj)
}
@ -289,7 +289,7 @@ func TestFIFO_PopShouldUnblockWhenClosed(t *testing.T) {
const jobs = 10
for i := 0; i < jobs; i++ {
go func() {
f.Pop(func(obj interface{}) error {
f.Pop(func(obj interface{}, isInInitialList bool) error {
return nil
})
c <- struct{}{}

View File

@ -39,7 +39,7 @@ func BenchmarkListener(b *testing.B) {
AddFunc: func(obj interface{}) {
swg.Done()
},
}, 0, 0, time.Now(), 1024*1024)
}, 0, 0, time.Now(), 1024*1024, func() bool { return true })
var wg wait.Group
defer wg.Wait() // Wait for .run and .pop to stop
defer close(pl.addCh) // Tell .run and .pop to stop

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache/synctrack"
"k8s.io/utils/buffer"
"k8s.io/utils/clock"
@ -132,11 +133,13 @@ import (
// state, except that its ResourceVersion is replaced with a
// ResourceVersion in which the object is actually absent.
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
// It returns a registration handle for the handler that can be used to remove
// the handler again.
// AddEventHandler adds an event handler to the shared informer using
// the shared informer's resync period. Events to a single handler are
// delivered sequentially, but there is no coordination between
// different handlers.
// It returns a registration handle for the handler that can be used to
// remove the handler again, or to tell if the handler is synced (has
// seen every item in the initial list).
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means
@ -169,6 +172,10 @@ type SharedInformer interface {
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
//
// Note that this doesn't tell you if an individual handler is synced!!
// For that, please call HasSynced on the handle returned by
// AddEventHandler.
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
@ -213,7 +220,14 @@ type SharedInformer interface {
// Opaque interface representing the registration of ResourceEventHandler for
// a SharedInformer. Must be supplied back to the same SharedInformer's
// `RemoveEventHandler` to unregister the handlers.
type ResourceEventHandlerRegistration interface{}
//
// Also used to tell if the handler is synced (has had all items in the initial
// list delivered).
type ResourceEventHandlerRegistration interface {
// HasSynced reports if both the parent has synced and all pre-sync
// events have been delivered.
HasSynced() bool
}
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
@ -409,7 +423,8 @@ type updateNotification struct {
}
type addNotification struct {
newObj interface{}
newObj interface{}
isInInitialList bool
}
type deleteNotification struct {
@ -588,7 +603,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
}
}
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
if !s.started {
return s.processor.addListener(listener), nil
@ -604,27 +619,35 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
handle := s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
// Note that we enqueue these notifications with the lock held
// and before returning the handle. That means there is never a
// chance for anyone to call the handle's HasSynced method in a
// state when it would falsely return true (i.e., when the
// shared informer is synced but it has not observed an Add
// with isInitialList being true, nor when the thread
// processing notifications somehow goes faster than this
// thread adding them and the counter is temporarily zero).
listener.add(addNotification{newObj: item, isInInitialList: true})
}
return handle, nil
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, s.transform, deltas)
return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
// Conforms to ResourceEventHandler
func (s *sharedIndexInformer) OnAdd(obj interface{}) {
func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
// Invocation of this function is locked under s.blockDeltas, so it is
// save to distribute the notification
s.cacheMutationDetector.AddObject(obj)
s.processor.distribute(addNotification{newObj: obj}, false)
s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
}
// Conforms to ResourceEventHandler
@ -846,6 +869,8 @@ type processorListener struct {
handler ResourceEventHandler
syncTracker *synctrack.SingleFileTracker
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
@ -876,11 +901,18 @@ type processorListener struct {
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
// HasSynced returns true if the source informer has synced, and all
// corresponding events have been delivered.
func (p *processorListener) HasSynced() bool {
return p.syncTracker.HasSynced()
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
@ -892,6 +924,9 @@ func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, res
}
func (p *processorListener) add(notification interface{}) {
if a, ok := notification.(addNotification); ok && a.isInInitialList {
p.syncTracker.Start()
}
p.addCh <- notification
}
@ -937,7 +972,10 @@ func (p *processorListener) run() {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
p.handler.OnAdd(notification.newObj, notification.isInInitialList)
if notification.isInInitialList {
p.syncTracker.Finished()
}
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:

View File

@ -52,7 +52,7 @@ func newTestListener(name string, resyncPeriod time.Duration, expected ...string
return l
}
func (l *testListener) OnAdd(obj interface{}) {
func (l *testListener) OnAdd(obj interface{}, isInInitialList bool) {
l.handle(obj)
}
@ -68,7 +68,6 @@ func (l *testListener) handle(obj interface{}) {
fmt.Printf("%s: handle: %v\n", l.name, key)
l.lock.Lock()
defer l.lock.Unlock()
objectMeta, _ := meta.Accessor(obj)
l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName())
}
@ -649,8 +648,8 @@ func TestSharedInformerHandlerAbuse(t *testing.T) {
worker := func() {
// Keep adding and removing handler
// Make sure no duplicate events?
funcs := ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
funcs := ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, isInInitialList bool) {},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {},
}
@ -902,9 +901,13 @@ func TestAddWhileActive(t *testing.T) {
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
listener1 := newTestListener("originalListener", 0, "pod1")
listener2 := newTestListener("originalListener", 0, "pod1", "pod2")
listener2 := newTestListener("listener2", 0, "pod1", "pod2")
handle1, _ := informer.AddEventHandler(listener1)
if handle1.HasSynced() {
t.Error("Synced before Run??")
}
stop := make(chan struct{})
defer close(stop)
@ -916,7 +919,17 @@ func TestAddWhileActive(t *testing.T) {
return
}
if !handle1.HasSynced() {
t.Error("Not synced after Run??")
}
listener2.lock.Lock() // ensure we observe it before it has synced
handle2, _ := informer.AddEventHandler(listener2)
if handle2.HasSynced() {
t.Error("Synced before processing anything?")
}
listener2.lock.Unlock() // permit it to proceed and sync
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
if !listener2.ok() {
@ -924,6 +937,10 @@ func TestAddWhileActive(t *testing.T) {
return
}
if !handle2.HasSynced() {
t.Error("Not synced even after processing?")
}
if !isRegistered(informer, handle1) {
t.Errorf("handle1 is not active")
return

116
tools/cache/synctrack/synctrack.go vendored Normal file
View File

@ -0,0 +1,116 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package synctrack contains utilities for helping controllers track whether
// they are "synced" or not, that is, whether they have processed all items
// from the informer's initial list.
package synctrack
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/util/sets"
)
// AsyncTracker helps propagate HasSynced in the face of multiple worker threads.
type AsyncTracker[T comparable] struct {
UpstreamHasSynced func() bool
lock sync.Mutex
waiting sets.Set[T]
}
// Start should be called prior to processing each key which is part of the
// initial list.
func (t *AsyncTracker[T]) Start(key T) {
t.lock.Lock()
defer t.lock.Unlock()
if t.waiting == nil {
t.waiting = sets.New[T](key)
} else {
t.waiting.Insert(key)
}
}
// Finished should be called when finished processing a key which was part of
// the initial list. Since keys are tracked individually, nothing bad happens
// if you call Finished without a corresponding call to Start. This makes it
// easier to use this in combination with e.g. queues which don't make it easy
// to plumb through the isInInitialList boolean.
func (t *AsyncTracker[T]) Finished(key T) {
t.lock.Lock()
defer t.lock.Unlock()
if t.waiting != nil {
t.waiting.Delete(key)
}
}
// HasSynced returns true if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *AsyncTracker[T]) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we can't hold our lock while
// waiting on that or a user is likely to get a deadlock.
if !t.UpstreamHasSynced() {
return false
}
t.lock.Lock()
defer t.lock.Unlock()
return t.waiting.Len() == 0
}
// SingleFileTracker helps propagate HasSynced when events are processed in
// order (i.e. via a queue).
type SingleFileTracker struct {
UpstreamHasSynced func() bool
count int64
}
// Start should be called prior to processing each key which is part of the
// initial list.
func (t *SingleFileTracker) Start() {
atomic.AddInt64(&t.count, 1)
}
// Finished should be called when finished processing a key which was part of
// the initial list. You must never call Finished() before (or without) its
// corresponding Start(), that is a logic error that could cause HasSynced to
// return a wrong value. To help you notice this should it happen, Finished()
// will panic if the internal counter goes negative.
func (t *SingleFileTracker) Finished() {
result := atomic.AddInt64(&t.count, -1)
if result < 0 {
panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value")
}
}
// HasSynced returns true if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *SingleFileTracker) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we don't want to then act on a
// stale count value.
if !t.UpstreamHasSynced() {
return false
}
return atomic.LoadInt64(&t.count) <= 0
}

239
tools/cache/synctrack/synctrack_test.go vendored Normal file
View File

@ -0,0 +1,239 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package synctrack
import (
"strings"
"sync"
"time"
"testing"
)
func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) {
tracker := SingleFileTracker{
UpstreamHasSynced: upstreamHasSynced,
}
return tracker.Start, tracker.Finished, tracker.HasSynced
}
func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) {
tracker := AsyncTracker[string]{
UpstreamHasSynced: upstreamHasSynced,
}
return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced
}
func TestBasicLogic(t *testing.T) {
table := []struct {
name string
construct func(func() bool) (func(), func(), func() bool)
}{
{"SingleFile", testSingleFileFuncs},
{"Async", testAsyncFuncs},
}
for _, entry := range table {
t.Run(entry.name, func(t *testing.T) {
table := []struct {
synced bool
start bool
finish bool
expectSynced bool
}{
{false, true, true, false},
{true, true, false, false},
{false, true, false, false},
{true, true, true, true},
}
for _, tt := range table {
Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced })
if tt.start {
Start()
}
if tt.finish {
Finished()
}
got := HasSynced()
if e, a := tt.expectSynced, got; e != a {
t.Errorf("for %#v got %v (wanted %v)", tt, a, e)
}
}
})
}
}
func TestAsyncLocking(t *testing.T) {
aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }}
var wg sync.WaitGroup
for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
wg.Add(1)
go func(i int) {
aft.Start(i)
go func() {
aft.Finished(i)
wg.Done()
}()
}(i)
}
wg.Wait()
if !aft.HasSynced() {
t.Errorf("async tracker must have made a threading error?")
}
}
func TestSingleFileCounting(t *testing.T) {
sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }}
for i := 0; i < 100; i++ {
sft.Start()
}
if sft.HasSynced() {
t.Fatal("Unexpectedly synced?")
}
for i := 0; i < 99; i++ {
sft.Finished()
}
if sft.HasSynced() {
t.Fatal("Unexpectedly synced?")
}
sft.Finished()
if !sft.HasSynced() {
t.Fatal("Unexpectedly not synced?")
}
// Calling an extra time will panic.
func() {
defer func() {
x := recover()
if x == nil {
t.Error("no panic?")
return
}
msg, ok := x.(string)
if !ok {
t.Errorf("unexpected panic value: %v", x)
return
}
if !strings.Contains(msg, "negative counter") {
t.Errorf("unexpected panic message: %v", msg)
return
}
}()
sft.Finished()
}()
// Negative counter still means it is synced
if !sft.HasSynced() {
t.Fatal("Unexpectedly not synced?")
}
}
func TestSingleFile(t *testing.T) {
table := []struct {
synced bool
starts int
stops int
expectSynced bool
}{
{false, 1, 1, false},
{true, 1, 0, false},
{false, 1, 0, false},
{true, 1, 1, true},
}
for _, tt := range table {
sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }}
for i := 0; i < tt.starts; i++ {
sft.Start()
}
for i := 0; i < tt.stops; i++ {
sft.Finished()
}
got := sft.HasSynced()
if e, a := tt.expectSynced, got; e != a {
t.Errorf("for %#v got %v (wanted %v)", tt, a, e)
}
}
}
func TestNoStaleValue(t *testing.T) {
table := []struct {
name string
construct func(func() bool) (func(), func(), func() bool)
}{
{"SingleFile", testSingleFileFuncs},
{"Async", testAsyncFuncs},
}
for _, entry := range table {
t.Run(entry.name, func(t *testing.T) {
var lock sync.Mutex
upstreamHasSynced := func() bool {
lock.Lock()
defer lock.Unlock()
return true
}
Start, Finished, HasSynced := entry.construct(upstreamHasSynced)
// Ordinarily the corresponding lock would be held and you wouldn't be
// able to call this function at this point.
if !HasSynced() {
t.Fatal("Unexpectedly not synced??")
}
Start()
if HasSynced() {
t.Fatal("Unexpectedly synced??")
}
Finished()
if !HasSynced() {
t.Fatal("Unexpectedly not synced??")
}
// Now we will prove that if the lock is held, you can't get a false
// HasSynced return.
lock.Lock()
// This goroutine calls HasSynced
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if HasSynced() {
t.Error("Unexpectedly synced??")
}
}()
// This goroutine increments + unlocks. The sleep is to bias the
// runtime such that the other goroutine usually wins (it needs to work
// in both orderings, this one is more likely to be buggy).
go func() {
time.Sleep(time.Millisecond)
Start()
lock.Unlock()
}()
wg.Wait()
})
}
}