Merge pull request #135395 from pohly/apimachinery-wait-for-cache-sync

apimachinery + client-go + device taint eviction unit test: context-aware Start/WaitFor, waiting through channels

Kubernetes-commit: eb09a3c23e3c3905c89e996fcec2c02ba8c4bb0e
This commit is contained in:
Kubernetes Publisher
2026-02-11 06:11:59 +05:30
17 changed files with 1283 additions and 203 deletions

View File

@@ -19,6 +19,7 @@ limitations under the License.
package informers
import (
context "context"
reflect "reflect"
sync "sync"
time "time"
@@ -26,6 +27,7 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
wait "k8s.io/apimachinery/pkg/util/wait"
admissionregistration "k8s.io/client-go/informers/admissionregistration"
apiserverinternal "k8s.io/client-go/informers/apiserverinternal"
apps "k8s.io/client-go/informers/apps"
@@ -158,6 +160,10 @@ func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultRes
}
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.StartWithContext(wait.ContextForChannel(stopCh))
}
func (f *sharedInformerFactory) StartWithContext(ctx context.Context) {
f.lock.Lock()
defer f.lock.Unlock()
@@ -167,15 +173,9 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
f.wg.Add(1)
// We need a new variable in each loop iteration,
// otherwise the goroutine would use the loop variable
// and that keeps changing.
informer := informer
go func() {
defer f.wg.Done()
informer.Run(stopCh)
}()
f.wg.Go(func() {
informer.RunWithContext(ctx)
})
f.startedInformers[informerType] = true
}
}
@@ -192,6 +192,11 @@ func (f *sharedInformerFactory) Shutdown() {
}
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
result := f.WaitForCacheSyncWithContext(wait.ContextForChannel(stopCh))
return result.Synced
}
func (f *sharedInformerFactory) WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
@@ -205,10 +210,31 @@ func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[ref
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
// Wait for informers to sync, without polling.
cacheSyncs := make([]cache.DoneChecker, 0, len(informers))
for _, informer := range informers {
cacheSyncs = append(cacheSyncs, informer.HasSyncedChecker())
}
cache.WaitFor(ctx, "" /* no logging */, cacheSyncs...)
res := cache.SyncResult{
Synced: make(map[reflect.Type]bool, len(informers)),
}
failed := false
for informType, informer := range informers {
hasSynced := informer.HasSynced()
if !hasSynced {
failed = true
}
res.Synced[informType] = hasSynced
}
if failed {
// context.Cause is more informative than ctx.Err().
// This must be non-nil, otherwise WaitFor wouldn't have stopped
// prematurely.
res.Err = context.Cause(ctx)
}
return res
}
@@ -247,27 +273,46 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
// defer factory.WaitForStop() // Returns immediately if nothing was started.
// genericInformer := factory.ForResource(resource)
// typedInformer := factory.SomeAPIGroup().V1().SomeType()
// factory.Start(ctx.Done()) // Start processing these informers.
// synced := factory.WaitForCacheSync(ctx.Done())
// for v, ok := range synced {
// if !ok {
// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v)
// return
// }
// handle, err := typeInformer.Informer().AddEventHandler(...)
// if err != nil {
// return fmt.Errorf("register event handler: %v", err)
// }
// defer typeInformer.Informer().RemoveEventHandler(handle) // Avoids leaking goroutines.
// factory.StartWithContext(ctx) // Start processing these informers.
// synced := factory.WaitForCacheSyncWithContext(ctx)
// if err := synced.AsError(); err != nil {
// return err
// }
// for v := range synced {
// // Only if desired log some information similar to this.
// fmt.Fprintf(os.Stdout, "cache synced: %s", v)
// }
//
// // Also make sure that all of the initial cache events have been delivered.
// if !WaitFor(ctx, "event handler sync", handle.HasSyncedChecker()) {
// // Must have failed because of context.
// return fmt.Errorf("sync event handler: %w", context.Cause(ctx))
// }
//
// // Creating informers can also be created after Start, but then
// // Start must be called again:
// anotherGenericInformer := factory.ForResource(resource)
// factory.Start(ctx.Done())
// factory.StartWithContext(ctx)
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
// Start initializes all requested informers. They are handled in goroutines
// which run until the stop channel gets closed.
// Warning: Start does not block. When run in a go-routine, it will race with a later WaitForCacheSync.
//
// Contextual logging: StartWithContext should be used instead of Start in code which supports contextual logging.
Start(stopCh <-chan struct{})
// StartWithContext initializes all requested informers. They are handled in goroutines
// which run until the context gets canceled.
// Warning: StartWithContext does not block. When run in a go-routine, it will race with a later WaitForCacheSync.
StartWithContext(ctx context.Context)
// Shutdown marks a factory as shutting down. At that point no new
// informers can be started anymore and Start will return without
// doing anything.
@@ -282,8 +327,14 @@ type SharedInformerFactory interface {
// WaitForCacheSync blocks until all started informers' caches were synced
// or the stop channel gets closed.
//
// Contextual logging: WaitForCacheSync should be used instead of WaitForCacheSync in code which supports contextual logging. It also returns a more useful result.
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
// WaitForCacheSyncWithContext blocks until all started informers' caches were synced
// or the context gets canceled.
WaitForCacheSyncWithContext(ctx context.Context) cache.SyncResult
// ForResource gives generic access to a shared informer of the matching type.
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)

View File

@@ -27,6 +27,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientgofeaturegate "k8s.io/client-go/features"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
@@ -141,6 +142,11 @@ type Controller interface {
// HasSynced delegates to the Config's Queue
HasSynced() bool
// HasSyncedChecker enables waiting for syncing without polling.
// The returned DoneChecker can be passed to WaitFor.
// It delegates to the Config's Queue.
HasSyncedChecker() DoneChecker
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion() string
@@ -167,11 +173,13 @@ func (c *controller) RunWithContext(ctx context.Context) {
<-ctx.Done()
c.config.Queue.Close()
}()
logger := klog.FromContext(ctx)
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
ReflectorOptions{
Logger: &logger,
ResyncPeriod: c.config.FullResyncPeriod,
MinWatchTimeout: c.config.MinWatchTimeout,
TypeDescription: c.config.ObjectDescription,
@@ -205,6 +213,13 @@ func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}
// HasSyncedChecker enables waiting for syncing without polling.
// The returned DoneChecker can be passed to [WaitFor].
// It delegates to the Config's Queue.
func (c *controller) HasSyncedChecker() DoneChecker {
return c.config.Queue.HasSyncedChecker()
}
func (c *controller) LastSyncResourceVersion() string {
c.reflectorMutex.RLock()
defer c.reflectorMutex.RUnlock()
@@ -395,6 +410,9 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
// InformerOptions configure a Reflector.
type InformerOptions struct {
// Logger, if not nil, is used instead of klog.Background() for logging.
Logger *klog.Logger
// ListerWatcher implements List and Watch functions for the source of the resource
// the informer will be informing about.
ListerWatcher ListerWatcher
@@ -587,6 +605,7 @@ func NewTransformingIndexerInformer(
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
logger klog.Logger,
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
@@ -604,7 +623,7 @@ func processDeltas(
if !ok {
return fmt.Errorf("ReplacedAll did not contain ReplacedAllInfo: %T", obj)
}
if err := processReplacedAllInfo(handler, info, clientState, isInInitialList, keyFunc); err != nil {
if err := processReplacedAllInfo(logger, handler, info, clientState, isInInitialList, keyFunc); err != nil {
return err
}
case SyncAll:
@@ -649,6 +668,7 @@ func processDeltas(
// Returns an error if any Delta or transaction fails. For TransactionError,
// only successful operations trigger callbacks.
func processDeltasInBatch(
logger klog.Logger,
handler ResourceEventHandler,
clientState Store,
deltas []Delta,
@@ -662,7 +682,7 @@ func processDeltasInBatch(
if !txnSupported {
var errs []error
for _, delta := range deltas {
if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil {
if err := processDeltas(logger, handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil {
errs = append(errs, err)
}
}
@@ -727,7 +747,7 @@ func processDeltasInBatch(
return nil
}
func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error {
func processReplacedAllInfo(logger klog.Logger, handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error {
var deletions []DeletedFinalStateUnknown
type replacement struct {
oldObj interface{}
@@ -735,7 +755,7 @@ func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo,
}
replacements := make([]replacement, 0, len(info.Objects))
err := reconcileReplacement(nil, clientState, info.Objects, keyFunc,
err := reconcileReplacement(logger, nil, clientState, info.Objects, keyFunc,
func(obj DeletedFinalStateUnknown) error {
deletions = append(deletions, obj)
return nil
@@ -784,7 +804,11 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := newQueueFIFO(clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider)
logger := klog.Background()
if options.Logger != nil {
logger = *options.Logger
}
logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider)
cfg := &Config{
Queue: fifo,
@@ -795,20 +819,30 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(options.Handler, clientState, deltas, isInInitialList, keyFunc)
// This must be the logger *of the fifo*.
return processDeltas(logger, options.Handler, clientState, deltas, isInInitialList, keyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList, keyFunc)
// Same here.
return processDeltasInBatch(logger, options.Handler, clientState, deltaList, isInInitialList, keyFunc)
},
}
return New(cfg)
}
func newQueueFIFO(clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue {
// newQueueFIFO constructs a new FIFO, choosing between real and delta FIFO
// depending on the InOrderInformers feature gate.
//
// It returns the FIFO and the logger used by the FIFO.
// That logger includes the name used for the FIFO,
// in contrast to the logger which was passed in.
func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
options := RealFIFOOptions{
Logger: &logger,
Name: fmt.Sprintf("RealFIFO %T", objectType),
KeyFunction: MetaNamespaceKeyFunc,
Transformer: transform,
Identifier: identifier,
@@ -821,12 +855,16 @@ func newQueueFIFO(clientState Store, transform TransformFunc, identifier Informe
} else {
options.KnownObjects = clientState
}
return NewRealFIFOWithOptions(options)
f := NewRealFIFOWithOptions(options)
return f.logger, f
} else {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
Logger: &logger,
Name: fmt.Sprintf("DeltaFIFO %T", objectType),
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transform,
})
return f.logger, f
}
}

View File

@@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/klog/v2/ktesting"
)
const handlerWaitTime = time.Millisecond
@@ -33,7 +34,8 @@ const handlerWaitTime = time.Millisecond
func BenchmarkAddWithSlowHandlers(b *testing.B) {
for _, unlockWhileProcessing := range []bool{false, true} {
b.Run(fmt.Sprintf("unlockWhileProcessing=%t", unlockWhileProcessing), func(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(b)
ctx, cancel := context.WithCancel(ctx)
source := fcache.NewFakeControllerSource()
b.Cleanup(func() {
cancel()
@@ -65,12 +67,12 @@ func BenchmarkAddWithSlowHandlers(b *testing.B) {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltas(logger, handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltasInBatch(logger, handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
},
}
c := New(cfg)

View File

@@ -834,6 +834,7 @@ func TestProcessDeltasInBatch(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
mockStore := &mockTxnStore{
Store: NewStore(MetaNamespaceKeyFunc),
failingObjs: tc.failingObjects,
@@ -851,6 +852,7 @@ func TestProcessDeltasInBatch(t *testing.T) {
},
}
err := processDeltasInBatch(
logger,
dummyListener,
mockStore,
tc.deltaList,
@@ -929,12 +931,12 @@ func TestReplaceEvents(t *testing.T) {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
},
}
c := New(cfg)
@@ -1066,12 +1068,12 @@ func TestResetWatch(t *testing.T) {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
},
}
c := New(cfg)

View File

@@ -22,6 +22,7 @@ import (
"sync"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
@@ -31,6 +32,12 @@ import (
// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
// optional.
type DeltaFIFOOptions struct {
// If set, log output will go to this logger instead of klog.Background().
// The name of the fifo gets added automatically.
Logger *klog.Logger
// Name can be used to override the default "DeltaFIFO" name for the new instance.
Name string
// KeyFunction is used to figure out what key an object should have. (It's
// exposed in the returned DeltaFIFO's KeyOf() method, with additional
@@ -55,9 +62,6 @@ type DeltaFIFOOptions struct {
// If set, will be called for objects before enqueueing them. Please
// see the comment on TransformFunc for details.
Transformer TransformFunc
// If set, log output will go to this logger instead of klog.Background().
Logger *klog.Logger
}
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
@@ -102,6 +106,13 @@ type DeltaFIFOOptions struct {
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
type DeltaFIFO struct {
// logger is a per-instance logger. This gets chosen when constructing
// the instance, with klog.Background() as default.
logger klog.Logger
// name is the name of the fifo. It is included in the logger.
name string
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
@@ -115,6 +126,11 @@ type DeltaFIFO struct {
// A key is in `queue` if and only if it is in `items`.
queue []string
// synced is initially an open channel. It gets closed (once!) by checkSynced_locked
// as soon as the initial sync is considered complete.
synced chan struct{}
syncedClosed bool
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
@@ -139,10 +155,6 @@ type DeltaFIFO struct {
// Called with every object if non-nil.
transformer TransformFunc
// logger is a per-instance logger. This gets chosen when constructing
// the instance, with klog.Background() as default.
logger klog.Logger
}
// TransformFunc allows for transforming an object before it will be processed.
@@ -263,6 +275,9 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
}
f := &DeltaFIFO{
logger: klog.Background(),
name: "DeltaFIFO",
synced: make(chan struct{}),
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
@@ -270,11 +285,14 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
logger: klog.Background(),
}
if opts.Logger != nil {
f.logger = *opts.Logger
}
if name := opts.Name; name != "" {
f.name = name
}
f.logger = klog.LoggerWithName(f.logger, f.name)
f.cond.L = &f.lock
return f
}
@@ -282,6 +300,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
_ = DoneChecker(&DeltaFIFO{}) // DeltaFIFO implements DoneChecker.
)
var (
@@ -327,8 +346,36 @@ func (f *DeltaFIFO) HasSynced() bool {
return f.hasSynced_locked()
}
// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *DeltaFIFO) HasSyncedChecker() DoneChecker {
return f
}
// Name implements [DoneChecker.Name]
func (f *DeltaFIFO) Name() string {
return f.name
}
// Done implements [DoneChecker.Done]
func (f *DeltaFIFO) Done() <-chan struct{} {
return f.synced
}
// hasSynced_locked returns the result of a prior checkSynced_locked call.
func (f *DeltaFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
return f.syncedClosed
}
// checkSynced_locked checks whether the initial is completed.
// It must be called whenever populated or initialPopulationCount change.
func (f *DeltaFIFO) checkSynced_locked() {
synced := f.populated && f.initialPopulationCount == 0
if synced && !f.syncedClosed {
// Initial sync is complete.
f.syncedClosed = true
close(f.synced)
}
}
// Add inserts an item, and puts it in the queue. The item is only enqueued
@@ -337,6 +384,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
return f.queueActionLocked(Added, obj)
}
@@ -345,6 +393,7 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
return f.queueActionLocked(Updated, obj)
}
@@ -361,6 +410,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
// Presumably, this was deleted when a relist happened.
@@ -477,10 +527,10 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del
// when given a non-empty list (as it is here).
// If somehow it happens anyway, deal with it but complain.
if oldDeltas == nil {
f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj)
utilruntime.HandleErrorWithLogger(f.logger, nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj)
return nil
}
f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj)
utilruntime.HandleErrorWithLogger(f.logger, nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
@@ -526,11 +576,12 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
f.checkSynced_locked()
}
item, ok := f.items[id]
if !ok {
// This should never happen
f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id)
utilruntime.HandleErrorWithLogger(f.logger, nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id)
continue
}
delete(f.items, id)
@@ -623,7 +674,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k)
utilruntime.HandleErrorWithLogger(f.logger, err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k)
} else if !exists {
deletedObj = nil
f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k)
@@ -638,6 +689,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
if !f.populated {
f.populated = true
f.initialPopulationCount = keys.Len() + queuedDeletions
f.checkSynced_locked()
}
return nil
@@ -666,7 +718,7 @@ func (f *DeltaFIFO) Resync() error {
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key)
utilruntime.HandleErrorWithLogger(f.logger, err, "Unexpected error during lookup, unable to queue object for sync", "key", key)
return nil
} else if !exists {
f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key)

121
tools/cache/event_handler_name.go vendored Normal file
View File

@@ -0,0 +1,121 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"reflect"
"runtime"
"strings"
)
func nameForHandler(handler ResourceEventHandler) (name string) {
defer func() {
// Last resort: let Sprintf handle it.
if name == "" {
name = fmt.Sprintf("%T", handler)
}
}()
if handler == nil {
return ""
}
switch handler := handler.(type) {
case *ResourceEventHandlerFuncs:
return nameForHandlerFuncs(*handler)
case ResourceEventHandlerFuncs:
return nameForHandlerFuncs(handler)
default:
// We can use the fully qualified name of whatever
// provides the interface. We don't care whether
// it contains fields or methods which provide
// the interface methods.
value := reflect.ValueOf(handler)
if value.Type().Kind() == reflect.Interface {
// Probably not needed, but let's play it safe.
value = value.Elem()
}
if value.Type().Kind() == reflect.Pointer {
if !value.IsNil() {
value = value.Elem()
}
}
name := value.Type().PkgPath()
if name != "" {
name += "."
}
if typeName := value.Type().Name(); typeName != "" {
name += typeName
}
return name
}
}
func nameForHandlerFuncs(funcs ResourceEventHandlerFuncs) string {
return nameForFunctions(funcs.AddFunc, funcs.UpdateFunc, funcs.DeleteFunc)
}
func nameForFunctions(fs ...any) string {
// If all functions are defined in the same place, then we
// don't care about the actual function name in
// e.g. "main.FuncName" or "main.(*Foo).FuncName-fm", instead
// we use the common qualifier.
//
// But we don't know that yet, so we also collect all names.
var qualifier string
singleQualifier := true
var names []string
for _, f := range fs {
if f == nil {
continue
}
name := nameForFunction(f)
if name == "" {
continue
}
names = append(names, name)
newQualifier := name
index := strings.LastIndexByte(newQualifier, '.')
if index > 0 {
newQualifier = newQualifier[:index]
}
switch qualifier {
case "":
qualifier = newQualifier
case newQualifier:
// So far, so good...
default:
// Nope, different.
singleQualifier = false
}
}
if singleQualifier {
return qualifier
}
return strings.Join(names, "+")
}
func nameForFunction(f any) string {
fn := runtime.FuncForPC(reflect.ValueOf(f).Pointer())
if fn == nil {
return ""
}
return fn.Name()
}

93
tools/cache/event_handler_name_test.go vendored Normal file
View File

@@ -0,0 +1,93 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
)
type mockHandler struct{}
func (m mockHandler) OnAdd(any, bool) {}
func (m mockHandler) OnUpdate(any, any) {}
func (m mockHandler) OnDelete(any) {}
func TestNameForHandler(t *testing.T) {
emptyHandler := ResourceEventHandlerFuncs{}
for name, tc := range map[string]struct {
handler ResourceEventHandler
wantName string
}{
"mixture": {
handler: ResourceEventHandlerFuncs{
UpdateFunc: emptyHandler.OnUpdate,
DeleteFunc: func(any) {},
},
wantName: "k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnUpdate-fm+k8s.io/client-go/tools/cache.TestNameForHandler.func1", // Testcase must come first to get func1.
},
"add": {
handler: ResourceEventHandlerFuncs{AddFunc: func(any) {}},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"update": {
handler: ResourceEventHandlerFuncs{UpdateFunc: func(any, any) {}},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"delete": {
handler: ResourceEventHandlerFuncs{DeleteFunc: func(any) {}},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"all": {
handler: ResourceEventHandlerFuncs{
AddFunc: func(any) {},
UpdateFunc: func(any, any) {},
DeleteFunc: func(any) {},
},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"ptrToFuncs": {
handler: &ResourceEventHandlerFuncs{AddFunc: func(any) {}},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"struct": {
handler: mockHandler{},
wantName: "k8s.io/client-go/tools/cache.mockHandler",
},
"ptrToStruct": {
handler: &mockHandler{},
wantName: "k8s.io/client-go/tools/cache.mockHandler",
},
"nil": {
handler: nil,
wantName: "<nil>",
},
"stored-nil": {
// This is a bit odd, but one unit test actually registered
// such an event handler and it somehow worked.
handler: (*mockHandler)(nil),
wantName: "*cache.mockHandler",
},
} {
t.Run(name, func(t *testing.T) {
gotName := nameForHandler(tc.handler)
if gotName != tc.wantName {
t.Errorf("Got name:\n %s\nWanted name:\n %s", gotName, tc.wantName)
}
})
}
}

50
tools/cache/fifo.go vendored
View File

@@ -58,6 +58,12 @@ type Queue interface {
// Update, or Delete; otherwise the first batch is empty.
HasSynced() bool
// HasSyncedChecker is done once the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, AddIfNotPresent,
// Update, or Delete; otherwise the first batch is empty.
HasSyncedChecker() DoneChecker
// Close the queue
Close()
}
@@ -110,6 +116,11 @@ type FIFO struct {
items map[string]interface{}
queue []string
// synced is initially an open channel. It gets closed (once!) by checkSynced
// as soon as the initial sync is considered complete.
synced chan struct{}
syncedClosed bool
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
@@ -127,7 +138,8 @@ type FIFO struct {
}
var (
_ = Queue(&FIFO{}) // FIFO is a Queue
_ = Queue(&FIFO{}) // FIFO is a Queue
_ = DoneChecker(&FIFO{}) // ... and implements DoneChecker.
)
// Close the queue.
@@ -146,8 +158,36 @@ func (f *FIFO) HasSynced() bool {
return f.hasSynced_locked()
}
// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *FIFO) HasSyncedChecker() DoneChecker {
return f
}
// Name implements [DoneChecker.Name]
func (f *FIFO) Name() string {
return "FIFO" // FIFO doesn't seem to be used outside of a few tests, so changing the NewFIFO API to pass in a name doesn't seem worth it.
}
// Done implements [DoneChecker.Done]
func (f *FIFO) Done() <-chan struct{} {
return f.synced
}
// hasSynced_locked returns the result of a prior checkSynced call.
func (f *FIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
return f.syncedClosed
}
// checkSynced checks whether the initial sync is completed.
// It must be called whenever populated or initialPopulationCount change
// while the mutex is still locked.
func (f *FIFO) checkSynced() {
synced := f.populated && f.initialPopulationCount == 0
if synced && !f.syncedClosed {
f.syncedClosed = true
close(f.synced)
}
}
// Add inserts an item, and puts it in the queue. The item is only enqueued
@@ -160,6 +200,7 @@ func (f *FIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced()
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
@@ -184,6 +225,7 @@ func (f *FIFO) Delete(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced()
delete(f.items, id)
return err
}
@@ -220,6 +262,8 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
// Must be done *after* process has completed.
defer f.checkSynced()
}
item, ok := f.items[id]
if !ok {
@@ -252,6 +296,7 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
if !f.populated {
f.populated = true
f.initialPopulationCount = len(items)
f.checkSynced()
}
f.items = items
@@ -290,6 +335,7 @@ func (f *FIFO) Resync() error {
// process.
func NewFIFO(keyFunc KeyFunc) *FIFO {
f := &FIFO{
synced: make(chan struct{}),
items: map[string]interface{}{},
queue: []string{},
keyFunc: keyFunc,

View File

@@ -17,6 +17,7 @@ limitations under the License.
package cache
import (
"context"
"sync"
"testing"
"time"
@@ -29,6 +30,25 @@ const (
concurrencyLevel = 5
)
type mockSynced struct {
context.Context
cancel func()
}
func newMockSynced(tb testing.TB, synced bool) *mockSynced {
m := &mockSynced{}
m.Context, m.cancel = context.WithCancel(context.Background())
if synced {
m.cancel()
}
tb.Cleanup(m.cancel)
return m
}
func (m *mockSynced) Name() string {
return "mock"
}
func BenchmarkListener(b *testing.B) {
var notification addNotification
@@ -40,7 +60,7 @@ func BenchmarkListener(b *testing.B) {
AddFunc: func(obj interface{}) {
swg.Done()
},
}, 0, 0, time.Now(), 1024*1024, func() bool { return true })
}, 0, 0, time.Now(), 1024*1024, newMockSynced(b, true))
var wg wait.Group
defer wg.Wait() // Wait for .run and .pop to stop
defer close(pl.addCh) // Tell .run and .pop to stop

View File

@@ -85,6 +85,7 @@ type TransformingStore interface {
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
logger klog.Logger
// name identifies this reflector. By default, it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
@@ -227,6 +228,10 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{},
// ReflectorOptions configures a Reflector.
type ReflectorOptions struct {
// Logger, if not nil, is used instead of klog.Background() for logging.
// The name of the reflector gets added automatically.
Logger *klog.Logger
// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
// in the call stack that is outside this package.
Name string
@@ -289,6 +294,13 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R
r.name = naming.GetNameFromCallsite(internalPackages...)
}
logger := klog.Background()
if options.Logger != nil {
logger = *options.Logger
}
logger = klog.LoggerWithName(logger, r.name)
r.logger = logger
if r.typeDescription == "" {
r.typeDescription = getTypeDescriptionFromObject(expectedType)
}
@@ -299,9 +311,7 @@ func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store R
r.useWatchList = clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient)
if r.useWatchList && watchlist.DoesClientNotSupportWatchListSemantics(lw) {
// Using klog.TODO() here because switching to a caller-provided contextual logger
// would require an API change and updating all existing call sites.
klog.TODO().V(2).Info(
r.logger.V(2).Info(
"The client used to build this informer/reflector doesn't support WatchList semantics. The feature will be disabled. This is expected in unit tests but not in production. For details, see the documentation of watchlist.DoesClientNotSupportWatchListSemantics().",
"feature", clientfeatures.WatchListClient,
)

View File

@@ -68,12 +68,12 @@ func TestReflectorDataConsistencyDetector(t *testing.T) {
}
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
_, ctx := ktesting.NewTestContext(t)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
store := NewStore(MetaNamespaceKeyFunc)
fifo := newQueueFIFO(store, transformer, InformerNameAndResource{}, nil)
_, fifo := newQueueFIFO(logger, nil, store, transformer, InformerNameAndResource{}, nil)
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {

View File

@@ -20,6 +20,9 @@ import (
"context"
"errors"
"fmt"
"reflect"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
@@ -194,6 +197,14 @@ type SharedInformer interface {
// For that, please call HasSynced on the handle returned by
// AddEventHandler.
HasSynced() bool
// HasSyncedChecker completes if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
//
// Note that this doesn't tell you if an individual handler is synced!!
// For that, please use HasSyncedChecker on the handle returned by
// AddEventHandler.
HasSyncedChecker() DoneChecker
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
@@ -247,6 +258,10 @@ type ResourceEventHandlerRegistration interface {
// HasSynced reports if both the parent has synced and all pre-sync
// events have been delivered.
HasSynced() bool
// HasSyncedChecker reports if both the parent has synced and all pre-sync
// events have been delivered.
HasSyncedChecker() DoneChecker
}
// Optional configuration options for [SharedInformer.AddEventHandlerWithOptions].
@@ -309,6 +324,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
processor: processor,
synced: make(chan struct{}),
listerWatcher: lw,
objectType: exampleObject,
objectDescription: options.ObjectDescription,
@@ -414,6 +430,147 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
return true
}
// WaitFor waits for a set of activities to complete, like cache syncing.
// It returns true if it was successful, false if the context was canceled
// before all activities are completed.
//
// If a non-nil "what" is provided, then progress information is logged
// while waiting ("Waiting", for="<what>"). Verbosity is V(0). This can
// be made less verbose by the caller with:
//
// WaitFor(klog.NewContext(ctx, klog.FromContext(ctx).V(2)), ...)
//
// In contrast to other WaitForCacheSync alternatives, this one here doesn't
// need polling, which makes it react immediately. When used in a synctest unit
// test, waiting completes without moving time forward randomly, which
// makes tests more predictable.
func WaitFor(ctx context.Context, what string, checkers ...DoneChecker) bool {
logger := klog.FromContext(ctx)
if what != "" {
helper, l := logger.WithCallStackHelper()
logger = l
helper()
logger.Info("Waiting", "for", what)
}
// Check in parallel to ensure that we log "Done waiting" as soon
// as possible for each checker. The timing may be useful to know.
// We cannot log inside the goroutine, the stack unwinding wouldn't
// work, so instead each goroutine just notifies the parent
// goroutine when it's checker is done and the main goroutine then
// logs it.
var wg sync.WaitGroup
type result struct {
checker DoneChecker
done bool
}
doneChecker := make(chan result)
for _, checker := range checkers {
wg.Go(func() {
select {
case <-checker.Done():
doneChecker <- result{checker, true}
case <-ctx.Done():
// We can end up here even when the checker is already done,
// select is not deterministic. Check once more without blocking
// before finally giving up.
select {
case <-checker.Done():
doneChecker <- result{checker, true}
default:
doneChecker <- result{checker, false}
}
}
})
}
instances := make([]string, 0, len(checkers))
for range len(checkers) {
// We are guaranteed to get exactly one result from each goroutine, so this won't block forever.
result := <-doneChecker
if result.done {
if what != "" {
logger.Info("Done waiting", "for", what, "instance", result.checker.Name())
}
} else {
// We don't need this information unless we are a) logging or b) debugging interactively.
instances = append(instances, result.checker.Name())
}
}
wg.Wait()
if what != "" && len(instances) > 0 {
slices.Sort(instances)
logger.Info("Timed out waiting", "for", what, "cause", context.Cause(ctx), "instances", instances)
}
done := len(instances) == 0
return done
}
// DoneChecker, in contrast to [InformerSynced], supports waiting
// for some activity to finish without polling and has a name
// that describes itself.
//
// To check for completion without blocking, use [IsDone].
type DoneChecker interface {
// Name returns a string describing the entity that is being waited for.
//
// Note that this name might be computed, so callers should only
// get the name outside of a hot code path.
Name() string
// Done returns a channel that will be closed on completion
// of the activity.
Done() <-chan struct{}
}
// IsDone returns true if the activity is done, false otherwise.
func IsDone(checker DoneChecker) bool {
select {
case <-checker.Done():
return true
default:
return false
}
}
// SyncResult is the result of a shared informer factory's WaitForCacheSyncWithContext.
// Under the hood such factories use [WaitFor] to wait for all instantiated informers,
// then provide this summary of what was synced.
//
// Note that the informers may have synced already before all event handlers registered with
// those informers have synced. Code which wants to be sure that all of its state is up-to-date
// should do its own WaitFor with the informer's HasSyncedChecker() *and* the
// registration handle's HasSyncChecker() results.
type SyncResult struct {
// Err is nil if all informer caches were synced, otherwise it is
// the reason why waiting for cache syncing stopped (= context.Cause(ctx)).
Err error
// Synced maps each registered informer in a SharedInformerFactory to
// true if it has synced, false otherwise.
Synced map[reflect.Type]bool
}
// AsError turns a SyncResult into an error if not all caches were synced,
// otherwise it returns nil. The error wraps context.Cause(ctx) and
// includes information about the informers which were not synced.
func (c SyncResult) AsError() error {
if c.Err == nil {
return nil
}
unsynced := make([]string, 0, len(c.Synced))
for t, synced := range c.Synced {
if !synced {
unsynced = append(unsynced, t.String())
}
}
slices.Sort(unsynced)
return fmt.Errorf("failed to sync all caches: %s: %w", strings.Join(unsynced, ", "), c.Err)
}
// `*sharedIndexInformer` implements SharedIndexInformer and has three
// main components. One is an indexed local cache, `indexer Indexer`.
// The second main component is a Controller that pulls
@@ -431,6 +588,10 @@ type sharedIndexInformer struct {
indexer Indexer
controller Controller
// synced gets created when creating the sharedIndexInformer.
// It gets closed when Run detects that the processor created
synced chan struct{}
processor *sharedProcessor
cacheMutationDetector MutationDetector
@@ -494,6 +655,10 @@ func (v *dummyController) HasSynced() bool {
return v.informer.HasSynced()
}
func (v *dummyController) HasSyncedChecker() DoneChecker {
return v.informer.HasSyncedChecker()
}
func (v *dummyController) LastSyncResourceVersion() string {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) {
return v.informer.LastSyncResourceVersion()
@@ -563,7 +728,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
fifo := newQueueFIFO(s.indexer, s.transform, s.identifier, s.fifoMetricsProvider)
logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider)
cfg := &Config{
Queue: fifo,
@@ -573,8 +738,12 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
FullResyncPeriod: s.resyncCheckPeriod,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
ProcessBatch: s.HandleBatchDeltas,
Process: func(obj interface{}, isInInitialList bool) error {
return s.handleDeltas(logger, obj, isInInitialList)
},
ProcessBatch: func(deltas []Delta, isInInitialList bool) error {
return s.handleBatchDeltas(logger, deltas, isInInitialList)
},
WatchErrorHandlerWithContext: s.watchErrorHandler,
}
@@ -594,6 +763,15 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
// has a RunWithContext method that we can use here.
wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run)
wg.StartWithContext(processorStopCtx, s.processor.run)
wg.Start(func() {
select {
case <-ctx.Done():
// We were stopped without completing the sync.
case <-s.controller.HasSyncedChecker().Done():
// Controller has synced and thus so have we.
close(s.synced)
}
})
defer func() {
s.startedLock.Lock()
@@ -610,13 +788,31 @@ func (s *sharedIndexInformer) HasStarted() bool {
}
func (s *sharedIndexInformer) HasSynced() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.controller == nil {
select {
case <-s.synced:
return true
default:
return false
}
return s.controller.HasSynced()
}
func (s *sharedIndexInformer) HasSyncedChecker() DoneChecker {
return &sharedIndexInformerDone{
s: s,
}
}
// sharedIndexInformerDone implements [NamedCacheSync] for a [sharedIndexInformer].
type sharedIndexInformerDone struct {
s *sharedIndexInformer
}
func (sd *sharedIndexInformerDone) Name() string {
return fmt.Sprintf("SharedIndexInformer %T", sd.s.objectType)
}
func (sd *sharedIndexInformerDone) Done() <-chan struct{} {
return sd.s.synced
}
func (s *sharedIndexInformer) LastSyncResourceVersion() string {
@@ -708,7 +904,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa
}
}
listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSyncedChecker())
if !s.started {
return s.processor.addListener(listener), nil
@@ -734,23 +930,27 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa
// thread adding them and the counter is temporarily zero).
listener.add(addNotification{newObj: item, isInInitialList: true})
}
// Initial list is added, now we can allow the listener to detect that "upstream has synced".
s.processor.wg.Start(listener.watchSynced)
return handle, nil
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
func (s *sharedIndexInformer) handleDeltas(logger klog.Logger, obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, deltas, isInInitialList, s.keyFunc)
return processDeltas(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc)
}
return errors.New("object given as Process argument is not Deltas")
}
func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error {
func (s *sharedIndexInformer) handleBatchDeltas(logger klog.Logger, deltas []Delta, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
return processDeltasInBatch(s, s.indexer, deltas, isInInitialList, s.keyFunc)
return processDeltasInBatch(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc)
}
// Conforms to ResourceEventHandler
@@ -854,6 +1054,8 @@ func (p *sharedProcessor) addListener(listener *processorListener) ResourceEvent
p.listeners[listener] = true
if p.listenersStarted {
// Not starting listener.watchSynced!
// The caller must first add the initial list, then start it.
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
@@ -925,6 +1127,7 @@ func (p *sharedProcessor) run(ctx context.Context) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
for listener := range p.listeners {
p.wg.Start(listener.watchSynced)
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
@@ -986,7 +1189,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe
}
// processorListener relays notifications from a sharedProcessor to
// one ResourceEventHandler --- using two goroutines, two unbuffered
// one ResourceEventHandler --- using three goroutines, two unbuffered
// channels, and an unbounded ring buffer. The `add(notification)`
// function sends the given notification to `addCh`. One goroutine
// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
@@ -994,16 +1197,23 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe
// Another goroutine runs `run()`, which receives notifications from
// `nextCh` and synchronously invokes the appropriate handler method.
//
// The third goroutine watches the upstream "has synced" channel
// and notifies a SingleFileTracker instance. That instance then
// combines the upstream state and the processListener state to
// implement the overall "event handler has synced".
//
// processorListener also keeps track of the adjusted requested resync
// period of the listener.
type processorListener struct {
logger klog.Logger
nextCh chan interface{}
addCh chan interface{}
done chan struct{}
handler ResourceEventHandler
syncTracker *synctrack.SingleFileTracker
syncTracker *synctrack.SingleFileTracker
upstreamHasSynced DoneChecker
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
@@ -1041,13 +1251,21 @@ func (p *processorListener) HasSynced() bool {
return p.syncTracker.HasSynced()
}
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
// HasNamedSync is done if the source informer has synced, and all
// corresponding events have been delivered.
func (p *processorListener) HasSyncedChecker() DoneChecker {
return p.syncTracker
}
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener {
ret := &processorListener{
logger: logger,
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
done: make(chan struct{}),
upstreamHasSynced: hasSynced,
handler: handler,
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))),
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
@@ -1068,6 +1286,7 @@ func (p *processorListener) add(notification interface{}) {
func (p *processorListener) pop() {
defer utilruntime.HandleCrashWithLogger(p.logger)
defer close(p.nextCh) // Tell .run() to stop
defer close(p.done) // Tell .watchSynced() to stop
var nextCh chan<- interface{}
var notification interface{}
@@ -1131,6 +1350,16 @@ func (p *processorListener) run() {
}
}
func (p *processorListener) watchSynced() {
select {
case <-p.upstreamHasSynced.Done():
// Notify tracker that the upstream has synced.
p.syncTracker.UpstreamHasSynced()
case <-p.done:
// Give up waiting for sync.
}
}
// shouldResync determines if the listener needs a resync. If the listener's resyncPeriod is 0,
// this always returns false.
func (p *processorListener) shouldResync(now time.Time) bool {

View File

@@ -1134,8 +1134,13 @@ func TestAddWhileActive(t *testing.T) {
return
}
if !handle1.HasSynced() {
t.Error("Not synced after Run??")
select {
case <-handle1.HasSyncedChecker().Done():
if !handle1.HasSynced() {
t.Error("Not synced after channel said we are synced??")
}
case <-time.After(10 * time.Second):
t.Error("Not synced 10 seconds after Run??")
}
listener2.lock.Lock() // ensure we observe it before it has synced

View File

@@ -20,6 +20,7 @@ limitations under the License.
package synctrack
import (
"context"
"sync"
"sync/atomic"
@@ -27,11 +28,32 @@ import (
)
// AsyncTracker helps propagate HasSynced in the face of multiple worker threads.
// The user has to monitor the upstream "has synced"
// and notify the tracker when that changes from false to true.
type AsyncTracker[T comparable] struct {
UpstreamHasSynced func() bool
// name describes the instance.
name string
// upstreamHasSynced is changed from false (initial value) to true
// when UpstreamHasSynced is called.
upstreamHasSynced atomic.Bool
lock sync.Mutex
waiting sets.Set[T]
// synced gets canceled once both the tracker and upstream are synced.
// A context is convenient for this because it gives us a channel
// and handles thread-safety.
synced context.Context
cancel func()
}
func NewAsyncTracker[T comparable](name string) *AsyncTracker[T] {
t := &AsyncTracker[T]{
name: name,
}
t.synced, t.cancel = context.WithCancel(context.Background())
return t
}
// Start should be called prior to processing each key which is part of the
@@ -57,6 +79,28 @@ func (t *AsyncTracker[T]) Finished(key T) {
if t.waiting != nil {
t.waiting.Delete(key)
}
// Maybe synced now?
if t.upstreamHasSynced.Load() && len(t.waiting) == 0 {
// Mark as synced.
t.cancel()
}
}
// UpstreamHasSynced needs to be called at least once as soon as
// the upstream "has synced" becomes true. It tells AsyncTracker
// that the source is synced.
//
// Must be called after handing over the initial list to Start.
func (t *AsyncTracker[T]) UpstreamHasSynced() {
// Upstream is done, but we might not be yet.
t.upstreamHasSynced.Store(true)
t.lock.Lock()
defer t.lock.Unlock()
if len(t.waiting) == 0 {
// Mark as synced.
t.cancel()
}
}
// HasSynced returns true if the source is synced and every key present in the
@@ -64,27 +108,51 @@ func (t *AsyncTracker[T]) Finished(key T) {
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *AsyncTracker[T]) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we can't hold our lock while
// waiting on that or a user is likely to get a deadlock.
if !t.UpstreamHasSynced() {
return false
}
t.lock.Lock()
defer t.lock.Unlock()
return t.waiting.Len() == 0
return t.synced.Err() != nil
}
// Done returns a channel that is closed if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *AsyncTracker[T]) Done() <-chan struct{} {
return t.synced.Done()
}
func (t *AsyncTracker[T]) Name() string {
return t.name
}
// SingleFileTracker helps propagate HasSynced when events are processed in
// order (i.e. via a queue).
// order (i.e. via a queue). The user has to monitor the upstream "has synced"
// and notify the tracker when that changes from false to true.
type SingleFileTracker struct {
// name describes the instance.
name string
// Important: count is used with atomic operations so it must be 64-bit
// aligned, otherwise atomic operations will panic. Having it at the top of
// the struct will guarantee that, even on 32-bit arches.
// See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information.
count int64
UpstreamHasSynced func() bool
// upstreamHasSynced is changed from false (initial value) to true
// when UpstreamHasSynced is called.
upstreamHasSynced atomic.Bool
// synced gets canceled once both the tracker and upstream are synced.
// A context is convenient for this because it gives us a channel
// and handles thread-safety.
synced context.Context
cancel func()
}
func NewSingleFileTracker(name string) *SingleFileTracker {
t := &SingleFileTracker{
name: name,
}
t.synced, t.cancel = context.WithCancel(context.Background())
return t
}
// Start should be called prior to processing each key which is part of the
@@ -103,6 +171,26 @@ func (t *SingleFileTracker) Finished() {
if result < 0 {
panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value")
}
// Maybe synced now?
if result == 0 && t.upstreamHasSynced.Load() {
// Mark as synced.
t.cancel()
}
}
// UpstreamHasSynced needs to be called at least once as soon as
// the upstream "has synced" becomes true. It tells SingleFileTracker
// that the source is synced.
//
// Must be called after handing over the initial list to Start.
func (t *SingleFileTracker) UpstreamHasSynced() {
// Upstream is done, but we might not be yet.
t.upstreamHasSynced.Store(true)
if atomic.LoadInt64(&t.count) == 0 {
// Mark as synced.
t.cancel()
}
}
// HasSynced returns true if the source is synced and every key present in the
@@ -110,11 +198,17 @@ func (t *SingleFileTracker) Finished() {
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *SingleFileTracker) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we don't want to then act on a
// stale count value.
if !t.UpstreamHasSynced() {
return false
}
return atomic.LoadInt64(&t.count) <= 0
return t.synced.Err() != nil
}
// Done returns a channel that is closed if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *SingleFileTracker) Done() <-chan struct{} {
return t.synced.Done()
}
func (t *SingleFileTracker) Name() string {
return t.name
}

View File

@@ -19,29 +19,24 @@ package synctrack
import (
"strings"
"sync"
"time"
"testing"
)
func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) {
tracker := SingleFileTracker{
UpstreamHasSynced: upstreamHasSynced,
}
return tracker.Start, tracker.Finished, tracker.HasSynced
func testSingleFileFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) {
tracker := NewSingleFileTracker("")
return tracker.UpstreamHasSynced, tracker.Start, tracker.Finished, tracker.HasSynced, tracker.Done()
}
func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) {
tracker := AsyncTracker[string]{
UpstreamHasSynced: upstreamHasSynced,
}
return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced
func testAsyncFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) {
tracker := NewAsyncTracker[string]("")
return tracker.UpstreamHasSynced, func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced, tracker.Done()
}
func TestBasicLogic(t *testing.T) {
table := []struct {
name string
construct func(func() bool) (func(), func(), func() bool)
construct func() (func(), func(), func(), func() bool, <-chan struct{})
}{
{"SingleFile", testSingleFileFuncs},
{"Async", testAsyncFuncs},
@@ -50,27 +45,87 @@ func TestBasicLogic(t *testing.T) {
for _, entry := range table {
t.Run(entry.name, func(t *testing.T) {
table := []struct {
synced bool
synced bool
syncedBeforeFinish bool
start bool
finish bool
expectSynced bool
}{
{false, true, true, false},
{true, true, false, false},
{false, true, false, false},
{true, true, true, true},
{false, false, true, true, false},
{true, false, true, false, false},
{true, true, true, false, false},
{false, false, true, false, false},
{true, false, true, true, true},
{true, true, true, true, true},
}
for _, tt := range table {
Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced })
upstreamHasSynced, start, finished, hasSynced, synced := entry.construct()
syncedDone := func() bool {
select {
case <-synced:
return true
default:
return false
}
}
if hasSynced() {
t.Errorf("for %#v got HasSynced() true before start (wanted false)", tt)
}
if syncedDone() {
t.Errorf("for %#v got Done() true before start (wanted false)", tt)
}
if tt.start {
Start()
start()
}
if hasSynced() {
t.Errorf("for %#v got HasSynced() true after start (wanted false)", tt)
}
if syncedDone() {
t.Errorf("for %#v got Done() true after start (wanted false)", tt)
}
// "upstream has synced" may occur before or after finished, but not before start.
if tt.synced && tt.syncedBeforeFinish {
upstreamHasSynced()
if hasSynced() {
t.Errorf("for %#v got HasSynced() true after upstreamHasSynced and before finish (wanted false)", tt)
}
if syncedDone() {
t.Errorf("for %#v got Done() true after upstreamHasSynced and before finish (wanted false)", tt)
}
}
if tt.finish {
Finished()
finished()
}
got := HasSynced()
if e, a := tt.expectSynced, got; e != a {
t.Errorf("for %#v got %v (wanted %v)", tt, a, e)
if tt.synced && !tt.syncedBeforeFinish {
if hasSynced() {
t.Errorf("for %#v got HasSynced() true after finish and before upstreamHasSynced (wanted false)", tt)
}
if syncedDone() {
t.Errorf("for %#v got Done() true after finish and before upstreamHasSynced (wanted false)", tt)
}
upstreamHasSynced()
}
if e, a := tt.expectSynced, hasSynced(); e != a {
t.Errorf("for %#v got HasSynced() %v (wanted %v)", tt, a, e)
}
if e, a := tt.expectSynced, syncedDone(); e != a {
t.Errorf("for %#v got Done() %v (wanted %v)", tt, a, e)
}
select {
case <-synced:
if !tt.expectSynced {
t.Errorf("for %#v got done (wanted not done)", tt)
}
default:
if tt.expectSynced {
t.Errorf("for %#v got done (wanted not done)", tt)
}
}
}
})
@@ -78,7 +133,7 @@ func TestBasicLogic(t *testing.T) {
}
func TestAsyncLocking(t *testing.T) {
aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }}
aft := NewAsyncTracker[int]("")
var wg sync.WaitGroup
for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
@@ -92,6 +147,7 @@ func TestAsyncLocking(t *testing.T) {
}(i)
}
wg.Wait()
aft.UpstreamHasSynced()
if !aft.HasSynced() {
t.Errorf("async tracker must have made a threading error?")
}
@@ -99,7 +155,7 @@ func TestAsyncLocking(t *testing.T) {
}
func TestSingleFileCounting(t *testing.T) {
sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }}
sft := NewSingleFileTracker("")
for i := 0; i < 100; i++ {
sft.Start()
@@ -115,6 +171,8 @@ func TestSingleFileCounting(t *testing.T) {
}
sft.Finished()
sft.UpstreamHasSynced()
if !sft.HasSynced() {
t.Fatal("Unexpectedly not synced?")
}
@@ -148,24 +206,35 @@ func TestSingleFileCounting(t *testing.T) {
func TestSingleFile(t *testing.T) {
table := []struct {
synced bool
synced bool
syncedBeforeStops bool
starts int
stops int
expectSynced bool
}{
{false, 1, 1, false},
{true, 1, 0, false},
{false, 1, 0, false},
{true, 1, 1, true},
{false, false, 1, 1, false},
{true, false, 1, 0, false},
{true, true, 1, 0, false},
{false, false, 1, 0, false},
{true, false, 1, 1, true},
{true, true, 1, 1, true},
}
for _, tt := range table {
sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }}
sft := NewSingleFileTracker("")
for i := 0; i < tt.starts; i++ {
sft.Start()
}
// "upstream has synced" may occur before or after finished, but not before start.
if tt.synced && tt.syncedBeforeStops {
sft.UpstreamHasSynced()
}
for i := 0; i < tt.stops; i++ {
sft.Finished()
}
if tt.synced && !tt.syncedBeforeStops {
sft.UpstreamHasSynced()
}
got := sft.HasSynced()
if e, a := tt.expectSynced, got; e != a {
t.Errorf("for %#v got %v (wanted %v)", tt, a, e)
@@ -173,67 +242,3 @@ func TestSingleFile(t *testing.T) {
}
}
func TestNoStaleValue(t *testing.T) {
table := []struct {
name string
construct func(func() bool) (func(), func(), func() bool)
}{
{"SingleFile", testSingleFileFuncs},
{"Async", testAsyncFuncs},
}
for _, entry := range table {
t.Run(entry.name, func(t *testing.T) {
var lock sync.Mutex
upstreamHasSynced := func() bool {
lock.Lock()
defer lock.Unlock()
return true
}
Start, Finished, HasSynced := entry.construct(upstreamHasSynced)
// Ordinarily the corresponding lock would be held and you wouldn't be
// able to call this function at this point.
if !HasSynced() {
t.Fatal("Unexpectedly not synced??")
}
Start()
if HasSynced() {
t.Fatal("Unexpectedly synced??")
}
Finished()
if !HasSynced() {
t.Fatal("Unexpectedly not synced??")
}
// Now we will prove that if the lock is held, you can't get a false
// HasSynced return.
lock.Lock()
// This goroutine calls HasSynced
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if HasSynced() {
t.Error("Unexpectedly synced??")
}
}()
// This goroutine increments + unlocks. The sleep is to bias the
// runtime such that the other goroutine usually wins (it needs to work
// in both orderings, this one is more likely to be buggy).
go func() {
time.Sleep(time.Millisecond)
Start()
lock.Unlock()
}()
wg.Wait()
})
}
}

View File

@@ -23,11 +23,20 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
utiltrace "k8s.io/utils/trace"
)
// RealFIFOOptions is the configuration parameters for RealFIFO.
type RealFIFOOptions struct {
// If set, log output will go to this logger instead of klog.Background().
// The name of the fifo gets added automatically.
Logger *klog.Logger
// Name can be used to override the default "RealFIFO" name for the new instance.
// Optional. Used only if Identifier.Name returns an empty string.
Name string
// KeyFunction is used to figure out what key an object should have. (It's
// exposed in the returned RealFIFO's keyOf() method, with additional
// handling around deleted objects and queue state).
@@ -78,11 +87,23 @@ var _ QueueWithBatch = &RealFIFO{}
// 1. delivers notifications for items that have been deleted
// 2. delivers multiple notifications per item instead of simply the most recent value
type RealFIFO struct {
// logger is a per-instance logger. This gets chosen when constructing
// the instance, with klog.Background() as default.
logger klog.Logger
// name is the name of the fifo. It is included in the logger.
name string
lock sync.RWMutex
cond sync.Cond
items []Delta
// synced is initially an open channel. It gets closed (once!) by checkSynced_locked
// as soon as the initial sync is considered complete.
synced chan struct{}
syncedClosed bool
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
@@ -145,6 +166,7 @@ type SyncAllInfo struct{}
var (
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
_ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations
_ = DoneChecker(&RealFIFO{}) // RealFIFO and implements DoneChecker.
)
// Close the queue.
@@ -181,11 +203,37 @@ func (f *RealFIFO) HasSynced() bool {
return f.hasSynced_locked()
}
// ignoring lint to reduce delta to the original for review. It's ok adjust later.
//
//lint:file-ignore ST1003: should not use underscores in Go names
// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *RealFIFO) HasSyncedChecker() DoneChecker {
return f
}
// Name implements [DoneChecker.Name]
func (f *RealFIFO) Name() string {
return f.name
}
// Done implements [DoneChecker.Done]
func (f *RealFIFO) Done() <-chan struct{} {
return f.synced
}
// hasSynced_locked returns the result of a prior checkSynced_locked call.
func (f *RealFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
return f.syncedClosed
}
// checkSynced_locked checks whether the initial batch of items (set via Replace) has been delivered
// and closes the synced channel as needed. It must be called after changing f.populated and/or
// f.initialPopulationCount while the mutex is still locked.
func (f *RealFIFO) checkSynced_locked() {
synced := f.populated && f.initialPopulationCount == 0
if synced && !f.syncedClosed {
// Initial sync is complete.
f.syncedClosed = true
close(f.synced)
}
}
// addToItems_locked appends to the delta list.
@@ -276,6 +324,7 @@ func (f *RealFIFO) Add(obj interface{}) error {
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
retErr := f.addToItems_locked(Added, false, obj)
return retErr
@@ -287,6 +336,7 @@ func (f *RealFIFO) Update(obj interface{}) error {
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
retErr := f.addToItems_locked(Updated, false, obj)
return retErr
@@ -300,6 +350,7 @@ func (f *RealFIFO) Delete(obj interface{}) error {
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
retErr := f.addToItems_locked(Deleted, false, obj)
return retErr
@@ -347,6 +398,7 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer func() {
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
f.checkSynced_locked()
}
}()
@@ -467,7 +519,6 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
unique.Insert(id)
moveDeltaToProcessList(i)
}
f.items = f.items[len(deltas):]
// Decrement initialPopulationCount if needed.
// This is done in a defer so we only do this *after* processing is complete,
@@ -475,6 +526,7 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
defer func() {
if f.initialPopulationCount > 0 {
f.initialPopulationCount -= len(deltas)
f.checkSynced_locked()
}
}()
@@ -524,7 +576,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error
if f.emitAtomicEvents {
err = f.addReplaceToItemsLocked(newItems, resourceVersion)
} else {
err = reconcileReplacement(f.items, f.knownObjects, newItems, f.keyOf,
err = reconcileReplacement(f.logger, f.items, f.knownObjects, newItems, f.keyOf,
func(obj DeletedFinalStateUnknown) error {
return f.addToItems_locked(Deleted, true, obj)
},
@@ -539,6 +591,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error
if !f.populated {
f.populated = true
f.initialPopulationCount = len(f.items)
f.checkSynced_locked()
}
return nil
@@ -548,6 +601,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error
// and based upon the state of the items in the queue and known objects will call onDelete and onReplace
// depending upon whether the item is being deleted or replaced/added.
func reconcileReplacement(
logger klog.Logger,
queuedItems []Delta,
knownObjects KeyListerGetter,
newItems []interface{},
@@ -623,10 +677,10 @@ func reconcileReplacement(
deletedObj, exists, err := knownObjects.GetByKey(knownKey)
if err != nil {
deletedObj = nil
utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err))
utilruntime.HandleErrorWithLogger(logger, err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey)
} else if !exists {
deletedObj = nil
utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey))
utilruntime.HandleErrorWithLogger(logger, nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey)
}
retErr := onDelete(DeletedFinalStateUnknown{
Key: knownKey,
@@ -683,10 +737,10 @@ func (f *RealFIFO) Resync() error {
knownObj, exists, err := f.knownObjects.GetByKey(knownKey)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err))
utilruntime.HandleErrorWithLogger(f.logger, err, "Unable to queue object for sync", "key", knownKey)
continue
} else if !exists {
utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey))
utilruntime.HandleErrorWithLogger(f.logger, nil, "Key does not exist in known objects store, unable to queue object for sync", "key", knownKey)
continue
}
@@ -739,7 +793,10 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
}
f := &RealFIFO{
logger: klog.Background(),
name: "RealFIFO",
items: make([]Delta, 0, 10),
synced: make(chan struct{}),
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
transformer: opts.Transformer,
@@ -749,7 +806,16 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
identifier: opts.Identifier,
metrics: newFIFOMetrics(opts.Identifier, opts.MetricsProvider),
}
if opts.Logger != nil {
f.logger = *opts.Logger
}
if name := opts.Name; name != "" {
f.name = name
}
if name := opts.Identifier.Name(); name != "" {
f.name = name
}
f.logger = klog.LoggerWithName(f.logger, f.name)
f.cond.L = &f.lock
return f
}

246
tools/cache/wait_test.go vendored Normal file
View File

@@ -0,0 +1,246 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"reflect"
"runtime"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
)
func init() {
// The test below is sensitive to the time zone, log output uses time.Local.
time.Local = time.UTC
}
func TestWaitFor(t *testing.T) {
for name, tc := range map[string]struct {
what string
checkers []DoneChecker
timeout time.Duration
timeoutReason string
expectDone bool
// Time is predictable and starts at the synctest epoch.
// %[1]d is the pid, %[2]d the line number of the WaitFor call.
expectOutput string
}{
"empty": {
expectDone: true,
},
"no-caches": {
what: "my-caches",
expectDone: true,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
`,
},
"no-logging": {
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)},
expectDone: true,
},
"with-logging": {
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)},
expectDone: true,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last"
I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="second"
I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first"
`,
},
"some-timeout": {
timeout: time.Minute,
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)},
expectDone: false,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last"
I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first"
I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["second"]
`,
},
"some-canceled": {
timeout: -1,
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)},
expectDone: false,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context canceled" instances=["first","second"]
`,
},
"more": {
timeoutReason: "go fish",
timeout: 5 * time.Second,
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)},
expectDone: false,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last"
I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="go fish" instances=["first","second"]
`,
},
"all": {
timeout: time.Minute,
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", -1), newMockChecker("second", -1), newMockChecker("last", -1)},
expectDone: false,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["first","last","second"]
`,
},
} {
t.Run(name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var buffer bytes.Buffer
logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&buffer)))
ctx := klog.NewContext(context.Background(), logger)
var wg sync.WaitGroup
defer wg.Wait()
if tc.timeout != 0 {
switch tc.timeoutReason {
case "":
if tc.timeout > 0 {
c, cancel := context.WithTimeout(ctx, tc.timeout)
defer cancel()
ctx = c
} else {
c, cancel := context.WithCancel(ctx)
cancel()
ctx = c
}
default:
c, cancel := context.WithCancelCause(ctx)
wg.Go(func() {
time.Sleep(tc.timeout)
cancel(errors.New(tc.timeoutReason))
})
ctx = c
}
}
_, _, line, _ := runtime.Caller(0)
done := WaitFor(ctx, tc.what, tc.checkers...)
expectOutput := tc.expectOutput
if expectOutput != "" {
expectOutput = fmt.Sprintf(expectOutput, os.Getpid(), line+1)
}
assert.Equal(t, tc.expectDone, done, "done")
assert.Equal(t, expectOutput, buffer.String(), "output")
})
})
}
}
// newMockChecker can be created outside of a synctest bubble.
// It constructs the channel inside when Done is first called.
func newMockChecker(name string, delay time.Duration) DoneChecker {
return &mockChecker{
name: name,
delay: delay,
}
}
type mockChecker struct {
name string
delay time.Duration
initialized bool
done <-chan struct{}
}
func (m *mockChecker) Name() string { return m.name }
func (m *mockChecker) Done() <-chan struct{} {
if !m.initialized {
switch {
case m.delay > 0:
// In the future.
ctx := context.Background()
// This leaks a cancel, but is hard to avoid (cannot use the parent t.Cleanup, no other way to delay calling it). Doesn't matter in a unit test.
//nolint:govet
ctx, _ = context.WithTimeout(ctx, m.delay)
m.done = ctx.Done()
case m.delay == 0:
// Immediately.
c := make(chan struct{})
close(c)
m.done = c
default:
// Never.
c := make(chan struct{})
m.done = c
}
m.initialized = true
}
return m.done
}
func TestSyncResult(t *testing.T) {
for name, tc := range map[string]struct {
result SyncResult
expectAsError string
}{
"empty": {},
"one": {
result: SyncResult{
Err: errors.New("my custom cancellation reason"),
Synced: map[reflect.Type]bool{
reflect.TypeFor[int](): true,
reflect.TypeFor[string](): false,
},
},
expectAsError: "failed to sync all caches: string: my custom cancellation reason",
},
"many": {
result: SyncResult{
Err: errors.New("my custom cancellation reason"),
Synced: map[reflect.Type]bool{
reflect.TypeFor[int](): false,
reflect.TypeFor[string](): false,
},
},
expectAsError: "failed to sync all caches: int, string: my custom cancellation reason",
},
} {
t.Run(name, func(t *testing.T) {
actual := tc.result.AsError()
switch {
case tc.expectAsError == "" && actual != nil:
t.Fatalf("expected no error, got %v", actual)
case tc.expectAsError != "" && actual == nil:
t.Fatalf("expected %q, got no error", actual)
case tc.expectAsError != "" && actual != nil && actual.Error() != tc.expectAsError:
t.Fatalf("expected %q, got %q", tc.expectAsError, actual.Error())
}
if tc.result.Err != nil && !errors.Is(actual, tc.result.Err) {
t.Errorf("actual error %+v should wrap %v but doesn't", actual, tc.result.Err)
}
})
}
}