From f29637f7f240bb6e21812c735b803902fb6584aa Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 8 Jan 2025 15:31:47 -0500 Subject: [PATCH 1/5] shrink the cache.Queue interface to what is actually used Kubernetes-commit: 0ddab1694579d523e3865c75ca44d6cdf1b0ef93 --- tools/cache/delta_fifo.go | 55 ------------------------------- tools/cache/delta_fifo_test.go | 60 ++++++++++++++++++++++++++++++++++ tools/cache/fifo.go | 44 ++----------------------- tools/cache/fifo_test.go | 44 +++++++++++++++++++++++++ tools/cache/reflector.go | 34 +++++++++++++++---- 5 files changed, 134 insertions(+), 103 deletions(-) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 4bb526cd..99ef6a01 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -508,61 +508,6 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del return nil } -// List returns a list of all the items; it returns the object -// from the most recent Delta. -// You should treat the items returned inside the deltas as immutable. -func (f *DeltaFIFO) List() []interface{} { - f.lock.RLock() - defer f.lock.RUnlock() - return f.listLocked() -} - -func (f *DeltaFIFO) listLocked() []interface{} { - list := make([]interface{}, 0, len(f.items)) - for _, item := range f.items { - list = append(list, item.Newest().Object) - } - return list -} - -// ListKeys returns a list of all the keys of the objects currently -// in the FIFO. -func (f *DeltaFIFO) ListKeys() []string { - f.lock.RLock() - defer f.lock.RUnlock() - list := make([]string, 0, len(f.queue)) - for _, key := range f.queue { - list = append(list, key) - } - return list -} - -// Get returns the complete list of deltas for the requested item, -// or sets exists=false. -// You should treat the items returned inside the deltas as immutable. -func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { - key, err := f.KeyOf(obj) - if err != nil { - return nil, false, KeyError{obj, err} - } - return f.GetByKey(key) -} - -// GetByKey returns the complete list of deltas for the requested item, -// setting exists=false if that list is empty. -// You should treat the items returned inside the deltas as immutable. -func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { - f.lock.RLock() - defer f.lock.RUnlock() - d, exists := f.items[key] - if exists { - // Copy item's slice so operations on this slice - // won't interfere with the object we return. - d = copyDeltas(d) - } - return d, exists, nil -} - // IsClosed checks if the queue is closed func (f *DeltaFIFO) IsClosed() bool { f.lock.Lock() diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 80994beb..12e67716 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -25,6 +25,66 @@ import ( "time" ) +// List returns a list of all the items; it returns the object +// from the most recent Delta. +// You should treat the items returned inside the deltas as immutable. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + return f.listLocked() +} + +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) listLocked() []interface{} { + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + list = append(list, item.Newest().Object) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the FIFO. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) ListKeys() []string { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]string, 0, len(f.queue)) + for _, key := range f.queue { + list = append(list, key) + } + return list +} + +// Get returns the complete list of deltas for the requested item, +// or sets exists=false. +// You should treat the items returned inside the deltas as immutable. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { + key, err := f.KeyOf(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + return f.GetByKey(key) +} + +// GetByKey returns the complete list of deltas for the requested item, +// setting exists=false if that list is empty. +// You should treat the items returned inside the deltas as immutable. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { + f.lock.RLock() + defer f.lock.RUnlock() + d, exists := f.items[key] + if exists { + // Copy item's slice so operations on this slice + // won't interfere with the object we return. + d = copyDeltas(d) + } + return d, exists, nil +} + // helper function to reduce stuttering func testPop(f *DeltaFIFO) testFifoObject { return Pop(f).(Deltas).Newest().Object.(testFifoObject) diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index dd13c4ea..9505f5ab 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -44,13 +44,13 @@ func (e ErrRequeue) Error() string { return e.Err.Error() } -// Queue extends Store with a collection of Store keys to "process". +// Queue extends ReflectorStore with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. // A Queue can be accessed concurrently from multiple goroutines. // A Queue can be "closed", after which Pop operations return an error. type Queue interface { - Store + ReflectorStore // Pop blocks until there is at least one key to process or the // Queue is closed. In the latter case Pop returns with an error. @@ -227,46 +227,6 @@ func (f *FIFO) Delete(obj interface{}) error { return err } -// List returns a list of all the items. -func (f *FIFO) List() []interface{} { - f.lock.RLock() - defer f.lock.RUnlock() - list := make([]interface{}, 0, len(f.items)) - for _, item := range f.items { - list = append(list, item) - } - return list -} - -// ListKeys returns a list of all the keys of the objects currently -// in the FIFO. -func (f *FIFO) ListKeys() []string { - f.lock.RLock() - defer f.lock.RUnlock() - list := make([]string, 0, len(f.items)) - for key := range f.items { - list = append(list, key) - } - return list -} - -// Get returns the requested item, or sets exists=false. -func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { - key, err := f.keyFunc(obj) - if err != nil { - return nil, false, KeyError{obj, err} - } - return f.GetByKey(key) -} - -// GetByKey returns the requested item, or sets exists=false. -func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { - f.lock.RLock() - defer f.lock.RUnlock() - item, exists = f.items[key] - return item, exists, nil -} - // IsClosed checks if the queue is closed func (f *FIFO) IsClosed() bool { f.lock.Lock() diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index 655f1378..ff863a60 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -24,6 +24,50 @@ import ( "time" ) +// List returns a list of all the items. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *FIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + list = append(list, item) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the FIFO. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *FIFO) ListKeys() []string { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]string, 0, len(f.items)) + for key := range f.items { + list = append(list, key) + } + return list +} + +// Get returns the requested item, or sets exists=false. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { + key, err := f.keyFunc(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + return f.GetByKey(key) +} + +// GetByKey returns the requested item, or sets exists=false. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { + f.lock.RLock() + defer f.lock.RUnlock() + item, exists = f.items[key] + return item, exists, nil +} + func testFifoObjectKeyFunc(obj interface{}) (string, error) { return obj.(testFifoObject).name, nil } diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index f8dac4f9..0d054df4 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -55,6 +55,28 @@ var ( defaultMinWatchTimeout = 5 * time.Minute ) +// ReflectorStore is the subset of cache.Store that the reflector uses +type ReflectorStore interface { + // Add adds the given object to the accumulator associated with the given object's key + Add(obj interface{}) error + + // Update updates the given object in the accumulator associated with the given object's key + Update(obj interface{}) error + + // Delete deletes the given object from the accumulator associated with the given object's key + Delete(obj interface{}) error + + // Replace will delete the contents of the store, using instead the + // given list. Store takes ownership of the list, you should not reference + // it after calling this function. + Replace([]interface{}, string) error + + // Resync is meaningless in the terms appearing here but has + // meaning in some implementations that have non-trivial + // additional behavior (e.g., DeltaFIFO). + Resync() error +} + // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // name identifies this reflector. By default, it will be a file:line if possible. @@ -72,7 +94,7 @@ type Reflector struct { // The GVK of the object we expect to place in the store if unstructured. expectedGVK *schema.GroupVersionKind // The destination to sync up with the watch source - store Store + store ReflectorStore // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher // backoff manages backoff of ListWatch @@ -189,13 +211,13 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa // NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack // that is outside this package. See NewReflectorWithOptions for further information. -func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { +func NewReflector(lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector { return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod}) } // NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further // information. -func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { +func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector { return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod}) } @@ -234,7 +256,7 @@ type ReflectorOptions struct { // "yes". This enables you to use reflectors to periodically process // everything as well as incrementally processing the things that // change. -func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector { +func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store ReflectorStore, options ReflectorOptions) *Reflector { reflectorClock := options.Clock if reflectorClock == nil { reflectorClock = clock.RealClock{} @@ -798,7 +820,7 @@ func handleWatch( ctx context.Context, start time.Time, w watch.Interface, - store Store, + store ReflectorStore, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, @@ -826,7 +848,7 @@ func handleAnyWatch( ctx context.Context, start time.Time, w watch.Interface, - store Store, + store ReflectorStore, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, From d853ccf18cc9ec7fff0c03f09150195eb7a30ed1 Mon Sep 17 00:00:00 2001 From: David Eads Date: Mon, 20 Jan 2025 13:54:32 -0500 Subject: [PATCH 2/5] Remove Queue.AddIfNotPresent Logically a cache.Queue.AddIfNotPresent means that the informer can move back in time since an older item is placed after newer items. The alternative of placing errors at the head of the queue leads to indefinite memory growth and repeated failures on retry. Luckily this behavior was behind RetryOnError, which was always set to false and impossible for normal users to set to true. By removing the function and setting, impacted users (none found in a github search) will get a compile failure. Kubernetes-commit: 8e77ac000131019d5aa49c19aa1f477f6dac4d59 --- tools/cache/controller.go | 14 +------------- tools/cache/controller_test.go | 1 - tools/cache/delta_fifo.go | 24 ------------------------ tools/cache/delta_fifo_test.go | 34 ---------------------------------- tools/cache/fifo.go | 22 ---------------------- tools/cache/fifo_test.go | 20 -------------------- tools/cache/shared_informer.go | 1 - 7 files changed, 1 insertion(+), 115 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 41ead09e..9ea1f494 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -72,13 +72,6 @@ type Config struct { // resync. ShouldResync ShouldResyncFunc - // If true, when Process() returns an error, re-enqueue the object. - // TODO: add interface to let you inject a delay/backoff or drop - // the object completely if desired. Pass the object in - // question to this interface as a parameter. This is probably moot - // now that this functionality appears at a higher level. - RetryOnError bool - // Called whenever the ListAndWatch drops the connection with an error. // // Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging. @@ -213,15 +206,11 @@ func (c *controller) processLoop(ctx context.Context) { // TODO: Plumb through the ctx so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. - obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) + _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } - if c.config.RetryOnError { - // This is the safe way to re-enqueue. - c.config.Queue.AddIfNotPresent(obj) - } } } } @@ -615,7 +604,6 @@ func newInformer(clientState Store, options InformerOptions) Controller { ObjectType: options.ObjectType, FullResyncPeriod: options.ResyncPeriod, MinWatchTimeout: options.MinWatchTimeout, - RetryOnError: false, Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index dba2dfe6..edc1c24a 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -62,7 +62,6 @@ func Example() { ListerWatcher: source, ObjectType: &v1.Pod{}, FullResyncPeriod: time.Millisecond * 100, - RetryOnError: false, // Let's implement a simple controller that just deletes // everything that comes in. diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 99ef6a01..ab1af552 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -369,30 +369,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { return f.queueActionLocked(Deleted, obj) } -// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already -// present in the set, it is neither enqueued nor added to the set. -// -// This is useful in a single producer/consumer scenario so that the consumer can -// safely retry items without contending with the producer and potentially enqueueing -// stale items. -// -// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is -// different from the Add/Update/Delete functions. -func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { - deltas, ok := obj.(Deltas) - if !ok { - return fmt.Errorf("object must be of type deltas, but got: %#v", obj) - } - id, err := f.KeyOf(deltas) - if err != nil { - return KeyError{obj, err} - } - f.lock.Lock() - defer f.lock.Unlock() - f.addIfNotPresent(id, deltas) - return nil -} - // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller // already holds the fifo lock. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 12e67716..477574ee 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "errors" "fmt" "reflect" "runtime" @@ -881,39 +880,6 @@ func TestDeltaFIFO_detectLineJumpers(t *testing.T) { } } -func TestDeltaFIFO_addIfNotPresent(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) - - emptyDeltas := Deltas{} - if err := f.AddIfNotPresent(emptyDeltas); err == nil || !errors.Is(err, ErrZeroLengthDeltasObject) { - t.Errorf("Expected error '%v', got %v", ErrZeroLengthDeltasObject, err) - } - - f.Add(mkFifoObj("b", 3)) - b3 := Pop(f) - f.Add(mkFifoObj("c", 4)) - c4 := Pop(f) - if e, a := 0, len(f.items); e != a { - t.Fatalf("Expected %v, got %v items in queue", e, a) - } - - f.Add(mkFifoObj("a", 1)) - f.Add(mkFifoObj("b", 2)) - f.AddIfNotPresent(b3) - f.AddIfNotPresent(c4) - - if e, a := 3, len(f.items); a != e { - t.Fatalf("expected queue length %d, got %d", e, a) - } - - expectedValues := []int{1, 2, 4} - for _, expected := range expectedValues { - if actual := testPop(f).val; actual != expected { - t.Fatalf("expected value %d, got %d", expected, actual) - } - } -} - func TestDeltaFIFO_KeyOf(t *testing.T) { f := DeltaFIFO{keyFunc: testFifoObjectKeyFunc} diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 9505f5ab..6df5c80f 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -64,11 +64,6 @@ type Queue interface { // Pop. Pop(PopProcessFunc) (interface{}, error) - // AddIfNotPresent puts the given accumulator into the Queue (in - // association with the accumulator's key) if and only if that key - // is not already associated with a non-empty accumulator. - AddIfNotPresent(interface{}) error - // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace // operation if that happened before any Add, AddIfNotPresent, @@ -177,23 +172,6 @@ func (f *FIFO) Add(obj interface{}) error { return nil } -// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already -// present in the set, it is neither enqueued nor added to the set. -// -// This is useful in a single producer/consumer scenario so that the consumer can -// safely retry items without contending with the producer and potentially enqueueing -// stale items. -func (f *FIFO) AddIfNotPresent(obj interface{}) error { - id, err := f.keyFunc(obj) - if err != nil { - return KeyError{obj, err} - } - f.lock.Lock() - defer f.lock.Unlock() - f.addIfNotPresent(id, obj) - return nil -} - // addIfNotPresent assumes the fifo lock is already held and adds the provided // item to the queue under id if it does not already exist. func (f *FIFO) addIfNotPresent(id string, obj interface{}) { diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index ff863a60..1f2428fb 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -248,26 +248,6 @@ func TestFIFO_detectLineJumpers(t *testing.T) { } } -func TestFIFO_addIfNotPresent(t *testing.T) { - f := NewFIFO(testFifoObjectKeyFunc) - - f.Add(mkFifoObj("a", 1)) - f.Add(mkFifoObj("b", 2)) - f.AddIfNotPresent(mkFifoObj("b", 3)) - f.AddIfNotPresent(mkFifoObj("c", 4)) - - if e, a := 3, len(f.items); a != e { - t.Fatalf("expected queue length %d, got %d", e, a) - } - - expectedValues := []int{1, 2, 4} - for _, expected := range expectedValues { - if actual := Pop(f).(testFifoObject).val; actual != expected { - t.Fatalf("expected value %d, got %d", expected, actual) - } - } -} - func TestFIFO_HasSynced(t *testing.T) { tests := []struct { actions []func(f *FIFO) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 49dd1edc..c511a0ce 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -552,7 +552,6 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { ObjectType: s.objectType, ObjectDescription: s.objectDescription, FullResyncPeriod: s.resyncCheckPeriod, - RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, From 52af3bdc0f562318e09e8f773ff7f660931d3891 Mon Sep 17 00:00:00 2001 From: David Eads Date: Mon, 20 Jan 2025 13:59:43 -0500 Subject: [PATCH 3/5] Remove cache.ErrRequeue cache.ErrRequeue advertised itself as a way to requeue failures on a FIFO, but it suffers the same problems as AddIfNotPresent. If we do requeue an item at the end, we'll move the informer back in time. If we requeue at the beginning we'll simply wedge FIFO. We didn't find examples in the wild, but by removing the error type those impacted will get a compile error and get to decide what action is most appropriate for their failure. Most of the time, proceeding to the next item is best. Kubernetes-commit: 238c32a1d9b2c72d648193fa8642a53a2884975f --- tools/cache/delta_fifo.go | 21 +--------------- tools/cache/delta_fifo_test.go | 44 --------------------------------- tools/cache/fifo.go | 31 ----------------------- tools/cache/fifo_test.go | 45 ---------------------------------- 4 files changed, 1 insertion(+), 140 deletions(-) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index ab1af552..264d7559 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -369,19 +369,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { return f.queueActionLocked(Deleted, obj) } -// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller -// already holds the fifo lock. -func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { - f.populated = true - if _, exists := f.items[id]; exists { - return - } - - f.queue = append(f.queue, id) - f.items[id] = deltas - f.cond.Broadcast() -} - // re-listing and watching can deliver the same update multiple times in any // order. This will combine the most recent two deltas if they are the same. func dedupDeltas(deltas Deltas) Deltas { @@ -497,9 +484,7 @@ func (f *DeltaFIFO) IsClosed() bool { // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). // process function is called under lock, so it is safe to update data structures -// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc -// may return an instance of ErrRequeue with a nested error to indicate the current -// item should be requeued (equivalent to calling AddIfNotPresent under the lock). +// in it that need to be in sync with the queue (e.g. knownKeys). // process should avoid expensive I/O operation so that other queue operations, i.e. // Add() and Get(), won't be blocked for too long. // @@ -546,10 +531,6 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { defer trace.LogIfLong(100 * time.Millisecond) } err := process(item, isInInitialList) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 477574ee..8f069eb1 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -304,50 +304,6 @@ func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { } } -func TestDeltaFIFO_requeueOnPop(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) - - f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: nil} - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: fmt.Errorf("test error")} - }) - if err == nil || err.Error() != "test error" { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return nil - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); ok || err != nil { - t.Fatalf("object should have been removed: %t %v", ok, err) - } -} - func TestDeltaFIFO_addUpdate(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("foo", 10)) diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 6df5c80f..5c2ca900 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -27,23 +27,9 @@ import ( // It is supposed to process the accumulator popped from the queue. type PopProcessFunc func(obj interface{}, isInInitialList bool) error -// ErrRequeue may be returned by a PopProcessFunc to safely requeue -// the current item. The value of Err will be returned from Pop. -type ErrRequeue struct { - // Err is returned by the Pop function - Err error -} - // ErrFIFOClosed used when FIFO is closed var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") -func (e ErrRequeue) Error() string { - if e.Err == nil { - return "the popped item should be requeued without returning an error" - } - return e.Err.Error() -} - // Queue extends ReflectorStore with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. @@ -172,19 +158,6 @@ func (f *FIFO) Add(obj interface{}) error { return nil } -// addIfNotPresent assumes the fifo lock is already held and adds the provided -// item to the queue under id if it does not already exist. -func (f *FIFO) addIfNotPresent(id string, obj interface{}) { - f.populated = true - if _, exists := f.items[id]; exists { - return - } - - f.queue = append(f.queue, id) - f.items[id] = obj - f.cond.Broadcast() -} - // Update is the same as Add in this implementation. func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) @@ -245,10 +218,6 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { } delete(f.items, id) err := process(item, isInInitialList) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } return item, err } } diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index 1f2428fb..1831889b 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "fmt" "reflect" "runtime" "testing" @@ -116,50 +115,6 @@ func TestFIFO_basic(t *testing.T) { } } -func TestFIFO_requeueOnPop(t *testing.T) { - f := NewFIFO(testFifoObjectKeyFunc) - - f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: nil} - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: fmt.Errorf("test error")} - }) - if err == nil || err.Error() != "test error" { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return nil - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); ok || err != nil { - t.Fatalf("object should have been removed: %t %v", ok, err) - } -} - func TestFIFO_addUpdate(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) f.Add(mkFifoObj("foo", 10)) From 43bf1a1b0a88d5f06a4e6c525499aafca504eeab Mon Sep 17 00:00:00 2001 From: David Eads Date: Fri, 10 Jan 2025 16:23:07 -0500 Subject: [PATCH 4/5] make a real FIFO implementation Kubernetes-commit: 24b43ea96ec60ca2c574ccc6b51c989488ca3210 --- tools/cache/the_real_fifo.go | 407 +++++++++++++ tools/cache/the_real_fifo_test.go | 976 ++++++++++++++++++++++++++++++ 2 files changed, 1383 insertions(+) create mode 100644 tools/cache/the_real_fifo.go create mode 100644 tools/cache/the_real_fifo_test.go diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go new file mode 100644 index 00000000..9be14ff3 --- /dev/null +++ b/tools/cache/the_real_fifo.go @@ -0,0 +1,407 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + utiltrace "k8s.io/utils/trace" + "sync" + "time" +) + +// RealFIFO is a Queue in which every notification from the Reflector is passed +// in order to the Queue via Pop. +// This means that it +// 1. delivers notifications for items that have been deleted +// 2. delivers multiple notifications per item instead of simply the most recent value +type RealFIFO struct { + lock sync.RWMutex + cond sync.Cond + + items []Delta + + // populated is true if the first batch of items inserted by Replace() has been populated + // or Delete/Add/Update was called first. + populated bool + // initialPopulationCount is the number of items inserted by the first call of Replace() + initialPopulationCount int + + // keyFunc is used to make the key used for queued item insertion and retrieval, and + // should be deterministic. + keyFunc KeyFunc + + // knownObjects list keys that are "known" --- affecting Delete(), + // Replace(), and Resync() + knownObjects KeyListerGetter + + // Indication the queue is closed. + // Used to indicate a queue is closed so a control loop can exit when a queue is empty. + // Currently, not used to gate any of CRUD operations. + closed bool + + // Called with every object if non-nil. + transformer TransformFunc +} + +var ( + _ = Queue(&RealFIFO{}) // RealFIFO is a Queue +) + +// Close the queue. +func (f *RealFIFO) Close() { + f.lock.Lock() + defer f.lock.Unlock() + f.closed = true + f.cond.Broadcast() +} + +// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or +// DeletedFinalStateUnknown objects. +func (f *RealFIFO) keyOf(obj interface{}) (string, error) { + if d, ok := obj.(Deltas); ok { + if len(d) == 0 { + return "", KeyError{obj, ErrZeroLengthDeltasObject} + } + obj = d.Newest().Object + } + if d, ok := obj.(Delta); ok { + obj = d.Object + } + if d, ok := obj.(DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return f.keyFunc(obj) +} + +// HasSynced returns true if an Add/Update/Delete are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *RealFIFO) HasSynced() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.hasSynced_locked() +} + +// ignoring lint to reduce delta to the original for review. It's ok adjust later. +// +//lint:file-ignore ST1003: should not use underscores in Go names +func (f *RealFIFO) hasSynced_locked() bool { + return f.populated && f.initialPopulationCount == 0 +} + +// addToItems_locked appends to the delta list. +func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bool, obj interface{}) error { + // we must be able to read the keys in order to determine whether the knownObjcts and the items + // in this FIFO overlap + _, err := f.keyOf(obj) + if err != nil { + return KeyError{obj, err} + } + + // Every object comes through this code path once, so this is a good + // place to call the transform func. + // + // If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync, + // then the object have already gone through the transformer. + // + // If the objects already present in the cache are passed to Replace(), + // the transformer must be idempotent to avoid re-mutating them, + // or coordinate with all readers from the cache to avoid data races. + // Default informers do not pass existing objects to Replace. + if f.transformer != nil { + _, isTombstone := obj.(DeletedFinalStateUnknown) + if !isTombstone && !skipTransform { + var err error + obj, err = f.transformer(obj) + if err != nil { + return err + } + } + } + + f.items = append(f.items, Delta{ + Type: deltaActionType, + Object: obj, + }) + f.cond.Broadcast() + + return nil +} + +// Add inserts an item, and puts it in the queue. The item is only enqueued +// if it doesn't already exist in the set. +func (f *RealFIFO) Add(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Added, false, obj) + + return retErr +} + +// Update is the same as Add in this implementation. +func (f *RealFIFO) Update(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Updated, false, obj) + + return retErr +} + +// Delete removes an item. It doesn't add it to the queue, because +// this implementation assumes the consumer only cares about the objects, +// not the order in which they were created/added. +func (f *RealFIFO) Delete(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Deleted, false, obj) + + return retErr +} + +// IsClosed checks if the queue is closed +func (f *RealFIFO) IsClosed() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.closed +} + +// Pop waits until an item is ready and processes it. If multiple items are +// ready, they are returned in the order in which they were added/updated. +// The item is removed from the queue (and the store) before it is processed. +// process function is called under lock, so it is safe +// update data structures in it that need to be in sync with the queue. +func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { + f.lock.Lock() + defer f.lock.Unlock() + + for len(f.items) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the f.closed is set and the condition is broadcasted. + // Which causes this loop to continue and return from the Pop(). + if f.closed { + return nil, ErrFIFOClosed + } + + f.cond.Wait() + } + + isInInitialList := !f.hasSynced_locked() + item := f.items[0] + // The underlying array still exists and references this object, so the object will not be garbage collected unless we zero the reference. + f.items[0] = Delta{} + f.items = f.items[1:] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } + + // Only log traces if the queue depth is greater than 10 and it takes more than + // 100 milliseconds to process one item from the queue. + // Queue depth never goes high because processing an item is locking the queue, + // and new items can't be added until processing finish. + // https://github.com/kubernetes/kubernetes/issues/103789 + if len(f.items) > 10 { + id, _ := f.keyOf(item) + trace := utiltrace.New("RealFIFO Pop Process", + utiltrace.Field{Key: "ID", Value: id}, + utiltrace.Field{Key: "Depth", Value: len(f.items)}, + utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) + defer trace.LogIfLong(100 * time.Millisecond) + } + + // we wrap in Deltas here to be compatible with preview Pop functions and those interpreting the return value. + err := process(Deltas{item}, isInInitialList) + return Deltas{item}, err +} + +// Replace +// 1. finds those items in f.items that are not in newItems and creates synthetic deletes for them +// 2. finds items in knownObjects that are not in newItems and creates synthetic deletes for them +// 3. adds the newItems to the queue +func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error { + f.lock.Lock() + defer f.lock.Unlock() + + // determine the keys of everything we're adding. We cannot add the items until after the synthetic deletes have been + // created for items that don't existing in newItems + newKeys := sets.Set[string]{} + for _, obj := range newItems { + key, err := f.keyOf(obj) + if err != nil { + return KeyError{obj, err} + } + newKeys.Insert(key) + } + + queuedItems := f.items + queuedKeys := []string{} + lastQueuedItemForKey := map[string]Delta{} + for _, queuedItem := range queuedItems { + queuedKey, err := f.keyOf(queuedItem.Object) + if err != nil { + return KeyError{queuedItem.Object, err} + } + + if _, seen := lastQueuedItemForKey[queuedKey]; !seen { + queuedKeys = append(queuedKeys, queuedKey) + } + lastQueuedItemForKey[queuedKey] = queuedItem + } + + // all the deletes already in the queue are important. There are two cases + // 1. queuedItems has delete for key/X and newItems has replace for key/X. This means the queued UID was deleted and a new one was created. + // 2. queuedItems has a delete for key/X and newItems does NOT have key/X. This means the queued item was deleted. + // Do deletion detection against objects in the queue. + for _, queuedKey := range queuedKeys { + if newKeys.Has(queuedKey) { + continue + } + + // Delete pre-existing items not in the new list. + // This could happen if watch deletion event was missed while + // disconnected from apiserver. + lastQueuedItem := lastQueuedItemForKey[queuedKey] + // if we've already got the item marked as deleted, no need to add another delete + if lastQueuedItem.Type == Deleted { + continue + } + + // if we got here, then the last entry we have for the queued item is *not* a deletion and we need to add a delete + deletedObj := lastQueuedItem.Object + + retErr := f.addToItems_locked(Deleted, true, DeletedFinalStateUnknown{ + Key: queuedKey, + Obj: deletedObj, + }) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + // Detect deletions for objects not present in the queue, but present in KnownObjects + knownKeys := f.knownObjects.ListKeys() + for _, knownKey := range knownKeys { + if newKeys.Has(knownKey) { // still present + continue + } + if _, inQueuedItems := lastQueuedItemForKey[knownKey]; inQueuedItems { // already added delete for these + continue + } + + deletedObj, exists, err := f.knownObjects.GetByKey(knownKey) + if err != nil { + deletedObj = nil + utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err)) + } else if !exists { + deletedObj = nil + utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey)) + } + retErr := f.addToItems_locked(Deleted, false, DeletedFinalStateUnknown{ + Key: knownKey, + Obj: deletedObj, + }) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + // now that we have the deletes we need for items, we can add the newItems to the items queue + for _, obj := range newItems { + retErr := f.addToItems_locked(Replaced, false, obj) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(f.items) + } + + return nil +} + +// Resync will ensure that every object in the Store has its key in the queue. +// This should be a no-op, because that property is maintained by all operations. +func (f *RealFIFO) Resync() error { + // TODO this cannot logically be done by the FIFO, it can only be done by the indexer + f.lock.Lock() + defer f.lock.Unlock() + + if f.knownObjects == nil { + return nil + } + + keysInQueue := sets.Set[string]{} + for _, item := range f.items { + key, err := f.keyOf(item.Object) + if err != nil { + return KeyError{item, err} + } + keysInQueue.Insert(key) + } + + knownKeys := f.knownObjects.ListKeys() + for _, knownKey := range knownKeys { + // If we are doing Resync() and there is already an event queued for that object, + // we ignore the Resync for it. This is to avoid the race, in which the resync + // comes with the previous value of object (since queueing an event for the object + // doesn't trigger changing the underlying store . + if keysInQueue.Has(knownKey) { + continue + } + + knownObj, exists, err := f.knownObjects.GetByKey(knownKey) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err)) + continue + } else if !exists { + utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey)) + continue + } + + retErr := f.addToItems_locked(Sync, true, knownObj) + if retErr != nil { + return fmt.Errorf("couldn't queue object: %w", err) + } + } + + return nil +} + +// NewRealFIFO returns a Store which can be used to queue up items to +// process. +func NewRealFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter, transformer TransformFunc) *RealFIFO { + if knownObjects == nil { + panic("coding error: knownObjects must be provided") + } + + f := &RealFIFO{ + items: make([]Delta, 0, 10), + keyFunc: keyFunc, + knownObjects: knownObjects, + transformer: transformer, + } + f.cond.L = &f.lock + return f +} diff --git a/tools/cache/the_real_fifo_test.go b/tools/cache/the_real_fifo_test.go new file mode 100644 index 00000000..649ea368 --- /dev/null +++ b/tools/cache/the_real_fifo_test.go @@ -0,0 +1,976 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "reflect" + "runtime" + "testing" + "time" +) + +func (f *RealFIFO) getItems() []Delta { + f.lock.Lock() + defer f.lock.Unlock() + + ret := make([]Delta, len(f.items)) + copy(ret, f.items) + return ret +} + +const closedFIFOName = "FIFO WAS CLOSED" + +func popN(queue Queue, count int) []interface{} { + result := []interface{}{} + for i := 0; i < count; i++ { + queue.Pop(func(obj interface{}, isInInitialList bool) error { + result = append(result, obj) + return nil + }) + } + return result +} + +// helper function to reduce stuttering +func testRealFIFOPop(f *RealFIFO) testFifoObject { + val := Pop(f) + if val == nil { + return testFifoObject{name: closedFIFOName} + } + return val.(Deltas).Newest().Object.(testFifoObject) +} + +func emptyKnownObjects() KeyListerGetter { + return literalListerGetter( + func() []testFifoObject { + return []testFifoObject{} + }, + ) +} + +func TestRealFIFO_basic(t *testing.T) { + f := NewRealFIFO(testFifoObjectKeyFunc, emptyKnownObjects(), nil) + const amount = 500 + go func() { + for i := 0; i < amount; i++ { + f.Add(mkFifoObj(string([]rune{'a', rune(i)}), i+1)) + } + }() + go func() { + for u := uint64(0); u < amount; u++ { + f.Add(mkFifoObj(string([]rune{'b', rune(u)}), u+1)) + } + }() + + lastInt := int(0) + lastUint := uint64(0) + for i := 0; i < amount*2; i++ { + switch obj := testRealFIFOPop(f).val.(type) { + case int: + if obj <= lastInt { + t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) + } + lastInt = obj + case uint64: + if obj <= lastUint { + t.Errorf("got %v (uint) out of order, last was %v", obj, lastUint) + } else { + lastUint = obj + } + default: + t.Fatalf("unexpected type %#v", obj) + } + } +} + +// TestRealFIFO_replaceWithDeleteDeltaIn tests that a `Sync` delta for an +// object `O` with ID `X` is added when .Replace is called and `O` is among the +// replacement objects even if the RealFIFO already stores in terminal position +// a delta of type `Delete` for ID `X`. Not adding the `Sync` delta causes +// SharedIndexInformers to miss `O`'s create notification, see https://github.com/kubernetes/kubernetes/issues/83810 +// for more details. +func TestRealFIFO_replaceWithDeleteDeltaIn(t *testing.T) { + oldObj := mkFifoObj("foo", 1) + newObj := mkFifoObj("foo", 2) + + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{oldObj} + }), + nil, + ) + + f.Delete(oldObj) + f.Replace([]interface{}{newObj}, "") + + actualDeltas := f.getItems() + expectedDeltas := []Delta{ + {Type: Deleted, Object: oldObj}, + {Type: Replaced, Object: newObj}, + } + if !reflect.DeepEqual(expectedDeltas, actualDeltas) { + t.Errorf("expected %#v, got %#v", expectedDeltas, actualDeltas) + } +} + +func TestRealFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *RealFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + //{ + // // ATTENTION: difference with delta_fifo_test, there is no option for emitDeltaTypeReplaced + // name: "Replaced object should have only a single Delete", + // operations: func(f *RealFIFO) { + // f.emitDeltaTypeReplaced = true + // f.Add(obj) + // f.Replace([]interface{}{obj}, "0") + // f.Replace([]interface{}{}, "0") + // }, + // expectedDeltas: Deltas{ + // {Added, obj}, + // {Replaced, obj}, + // {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + // }, + //}, + { + name: "Deleted object should have only a single Delete", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + // Test with a RealFIFO with a backing KnownObjects + fWithKnownObjects := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{} + }), + nil, + ) + tt.operations(fWithKnownObjects) + actualDeltasWithKnownObjects := popN(fWithKnownObjects, len(fWithKnownObjects.getItems())) + actualAsDeltas := collapseDeltas(actualDeltasWithKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualAsDeltas) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualAsDeltas) + } + if len(fWithKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items) + } + + // ATTENTION: difference with delta_fifo_test, there is no option without knownObjects + }) + } +} + +func collapseDeltas(ins []interface{}) Deltas { + ret := Deltas{} + for _, curr := range ins { + for _, delta := range curr.(Deltas) { + ret = append(ret, delta) + } + } + return ret +} + +// ATTENTION: difference with delta_fifo_test, there is no requeue option anymore +// func TestDeltaFIFO_requeueOnPop(t *testing.T) { + +func TestRealFIFO_addUpdate(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("foo", 12)) + f.Delete(mkFifoObj("foo", 15)) + + // ATTENTION: difference with delta_fifo_test, all items on the list. DeltaFIFO.List only showed newest, but Pop processed all. + expected1 := []Delta{ + { + Type: Added, + Object: mkFifoObj("foo", 10), + }, + { + Type: Updated, + Object: mkFifoObj("foo", 12), + }, + { + Type: Deleted, + Object: mkFifoObj("foo", 15), + }, + } + if e, a := expected1, f.getItems(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + + got := make(chan testFifoObject, 4) + done := make(chan struct{}) + go func() { + defer close(done) + for { + obj := testRealFIFOPop(f) + if obj.name == closedFIFOName { + break + } + t.Logf("got a thing %#v", obj) + t.Logf("D len: %v", len(f.items)) + got <- obj + } + }() + + first := <-got + if e, a := 10, first.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + second := <-got + if e, a := 12, second.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + third := <-got + if e, a := 15, third.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected.val) + case <-time.After(50 * time.Millisecond): + } + + if e, a := 0, len(f.getItems()); e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + f.Close() + <-done +} + +func TestRealFIFO_transformer(t *testing.T) { + mk := func(name string, rv int) testFifoObject { + return mkFifoObj(name, &rvAndXfrm{rv, 0}) + } + xfrm := TransformFunc(func(obj interface{}) (interface{}, error) { + switch v := obj.(type) { + case testFifoObject: + v.val.(*rvAndXfrm).xfrm++ + case DeletedFinalStateUnknown: + if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 { + return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj) + } + default: + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + return obj, nil + }) + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + mustTransform := func(obj interface{}) interface{} { + ret, err := xfrm(obj) + must(err) + return ret + } + + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + xfrm, + ) + must(f.Add(mk("foo", 10))) + must(f.Add(mk("bar", 11))) + must(f.Update(mk("foo", 12))) + must(f.Delete(mk("foo", 15))) + must(f.Replace([]interface{}{}, "")) + must(f.Add(mk("bar", 16))) + must(f.Replace([]interface{}{}, "")) + + // ATTENTION: difference with delta_fifo_test, without compression, we keep all the items, including bar being deleted multiple times. + // DeltaFIFO starts by checking keys, we start by checking types and keys + expected1 := []Delta{ + {Type: Added, Object: mustTransform(mk("foo", 10))}, + {Type: Added, Object: mustTransform(mk("bar", 11))}, + {Type: Updated, Object: mustTransform(mk("foo", 12))}, + {Type: Deleted, Object: mustTransform(mk("foo", 15))}, + {Type: Deleted, Object: DeletedFinalStateUnknown{Key: "bar", Obj: mustTransform(mk("bar", 11))}}, + {Type: Added, Object: mustTransform(mk("bar", 16))}, + {Type: Deleted, Object: DeletedFinalStateUnknown{Key: "bar", Obj: mustTransform(mk("bar", 16))}}, + } + actual1 := f.getItems() + if len(expected1) != len(actual1) { + t.Fatalf("Expected %+v, got %+v", expected1, actual1) + } + for i := 0; i < len(actual1); i++ { + e := expected1[i] + a := actual1[i] + if e.Type != a.Type { + t.Errorf("%d Expected %+v, got %+v", i, e, a) + } + eKey, err := f.keyOf(e) + if err != nil { + t.Fatal(err) + } + aKey, err := f.keyOf(a) + if err != nil { + t.Fatal(err) + } + if eKey != aKey { + t.Errorf("%d Expected %+v, got %+v", i, eKey, aKey) + } + } + + for i := 0; i < len(expected1); i++ { + obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil }) + if err != nil { + t.Fatalf("got nothing on try %v?", i) + } + a := obj.(Deltas)[0] + e := expected1[i] + if !reflect.DeepEqual(e, a) { + t.Errorf("%d Expected %+v, got %+v", i, e, a) + } + } +} + +func TestRealFIFO_enqueueingNoLister(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("bar", 15)) + f.Add(mkFifoObj("qux", 17)) + f.Delete(mkFifoObj("qux", 18)) + + // RealFIFO queues everything + f.Delete(mkFifoObj("baz", 20)) + + // ATTENTION: difference with delta_fifo_test, without compression every item is queued + expectList := []int{10, 15, 17, 18, 20} + for _, expect := range expectList { + if e, a := expect, testRealFIFOPop(f).val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + } + if e, a := 0, len(f.getItems()); e != a { + t.Errorf("queue unexpectedly not empty: %v != %v\n%#v", e, a, f.getItems()) + } +} + +func TestRealFIFO_enqueueingWithLister(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("bar", 15)) + + // This delete does enqueue the deletion, because "baz" is in the key lister. + f.Delete(mkFifoObj("baz", 20)) + + expectList := []int{10, 15, 20} + for _, expect := range expectList { + if e, a := expect, testRealFIFOPop(f).val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + } + if e, a := 0, len(f.items); e != a { + t.Errorf("queue unexpectedly not empty: %v != %v", e, a) + } +} + +func TestRealFIFO_addReplace(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0") + got := make(chan testFifoObject, 3) + done := make(chan struct{}) + go func() { + defer close(done) + for { + obj := testRealFIFOPop(f) + if obj.name == closedFIFOName { + break + } + got <- obj + } + }() + + // ATTENTION: difference with delta_fifo_test, we get every event instead of the .Newest making us skip some for the test, but not at runtime. + curr := <-got + if e, a := 10, curr.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + curr = <-got + if e, a := 15, curr.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected.val) + case <-time.After(50 * time.Millisecond): + } + + if items := f.getItems(); len(items) > 0 { + t.Errorf("item did not get removed") + } + f.Close() + <-done +} + +func TestRealFIFO_ResyncNonExisting(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Delete(mkFifoObj("foo", 10)) + f.Resync() + + deltas := f.getItems() + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Deleted { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + +func TestRealFIFO_Resync(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Resync() + + deltas := f.getItems() + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Sync { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + +func TestRealFIFO_DeleteExistingNonPropagated(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 5)) + f.Delete(mkFifoObj("foo", 6)) + + deltas := f.getItems() + if len(deltas) != 2 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[len(deltas)-1].Type != Deleted { + t.Errorf("unexpected delta: %v", deltas[len(deltas)-1]) + } +} + +func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { + // We test with only one pre-existing object because there is no + // promise about how their deletes are ordered. + + // Try it with a pre-existing Delete + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try starting with an Add instead of a Delete + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Add(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + expectedList = []Deltas{ + {{Added, mkFifoObj("baz", 10)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try deleting and recreating the object in the queue, then delete it by a Replace call + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Delete(mkFifoObj("bar", 6)) + f.Add(mkFifoObj("bar", 100)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + expectedList = []Deltas{ + {{Deleted, mkFifoObj("bar", 6)}}, + {{Added, mkFifoObj("bar", 100)}}, + // Since "bar" has a newer object in the queue than in the state, + // it should get a tombstone key with the latest object from the queue + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try syncing it first to ensure the delete use the latest version + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0") + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + // ATTENTION: difference with delta_fifo_test, deltaFifo associated by key, but realFIFO orders across all keys, so this ordering changed + expectedList = []Deltas{ + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "baz" didn't have a delete event and wasn't in the Replace list + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("bar", 100)}}, + {{Replaced, mkFifoObj("foo", 5)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for i, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("%d Expected %#v, got %#v", i, e, a) + } + } + + // Now try starting without an explicit KeyListerGetter + f = NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + {{Added, mkFifoObj("baz", 10)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// TestRealFIFO_ReplaceMakesDeletionsReplaced is the same as the above test, but +// ensures that a Replaced DeltaType is emitted. +func TestRealFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 6)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// ATTENTION: difference with delta_fifo_test, the previous value was hardcoded as use "Replace" so I've eliminated the option to set it differently +//func TestRealFIFO_ReplaceDeltaType(t *testing.T) { + +func TestRealFIFO_UpdateResyncRace(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Update(mkFifoObj("foo", 6)) + f.Resync() + + expectedList := []Deltas{ + {{Updated, mkFifoObj("foo", 6)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +func TestRealFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + if f.HasSynced() { + t.Errorf("Expected HasSynced to be false") + } + cur, initial := pop2[Deltas](f) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + if initial != true { + t.Error("Expected initial list item") + } + } + if !f.HasSynced() { + t.Errorf("Expected HasSynced to be true") + } +} + +func TestRealFIFO_detectLineJumpers(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + + f.Add(mkFifoObj("foo", 10)) + f.Add(mkFifoObj("bar", 1)) + f.Add(mkFifoObj("foo", 11)) + f.Add(mkFifoObj("foo", 13)) + f.Add(mkFifoObj("zab", 30)) + + // ATTENTION: difference with delta_fifo_test, every event is delivered in order + + if e, a := 10, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + + f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line + + if e, a := 1, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 11, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 13, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 30, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 14, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +func TestRealFIFO_KeyOf(t *testing.T) { + f := RealFIFO{keyFunc: testFifoObjectKeyFunc} + + table := []struct { + obj interface{} + key string + }{ + {obj: testFifoObject{name: "A"}, key: "A"}, + {obj: DeletedFinalStateUnknown{Key: "B", Obj: nil}, key: "B"}, + {obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"}, + {obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D", Obj: nil}}}, key: "D"}, + } + + for _, item := range table { + got, err := f.keyOf(item.obj) + if err != nil { + t.Errorf("Unexpected error for %q: %v", item.obj, err) + continue + } + if e, a := item.key, got; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} + +func TestRealFIFO_HasSynced(t *testing.T) { + tests := []struct { + actions []func(f *RealFIFO) + expectedSynced bool + }{ + { + actions: []func(f *RealFIFO){}, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Add(mkFifoObj("a", 1)) }, + }, + expectedSynced: true, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{}, "0") }, + }, + expectedSynced: true, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + }, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: true, + }, + { + // This test case won't happen in practice since a Reflector, the only producer for delta_fifo today, always passes a complete snapshot consistent in time; + // there cannot be duplicate keys in the list or apiserver is broken. + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("a", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + // ATTENTION: difference with delta_fifo_test, every event is delivered, so a is listed twice and must be popped twice to remove both + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: true, + }, + } + + for i, test := range tests { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + + for _, action := range test.actions { + action(f) + } + if e, a := test.expectedSynced, f.HasSynced(); a != e { + t.Errorf("test case %v failed, expected: %v , got %v", i, e, a) + } + } +} + +// TestRealFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue +// should unblock and return after Close is called. +func TestRealFIFO_PopShouldUnblockWhenClosed(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + + c := make(chan struct{}) + const jobs = 10 + for i := 0; i < jobs; i++ { + go func() { + f.Pop(func(obj interface{}, isInInitialList bool) error { + return nil + }) + c <- struct{}{} + }() + } + + runtime.Gosched() + f.Close() + + for i := 0; i < jobs; i++ { + select { + case <-c: + case <-time.After(500 * time.Millisecond): + t.Fatalf("timed out waiting for Pop to return after Close") + } + } +} From f2d9cfb8c84be89826a55782c904908e7783d004 Mon Sep 17 00:00:00 2001 From: David Eads Date: Fri, 10 Jan 2025 16:23:23 -0500 Subject: [PATCH 5/5] switch to using the real FIFO Kubernetes-commit: a9aab298b4738f4ea9111131cdf193a3b1ba14e5 --- features/known_features.go | 7 +++++++ tools/cache/controller.go | 17 ++++++++++++----- tools/cache/controller_test.go | 7 ++----- tools/cache/shared_informer.go | 15 ++++++++++----- 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/features/known_features.go b/features/known_features.go index a74f6a83..344d2ebb 100644 --- a/features/known_features.go +++ b/features/known_features.go @@ -53,6 +53,12 @@ const ( // alpha: v1.30 InformerResourceVersion Feature = "InformerResourceVersion" + // owner: @deads2k + // beta: v1.33 + // + // Refactor informers to deliver watch stream events in order instead of out of order. + InOrderInformers Feature = "InOrderInformers" + // owner: @p0lyn0mial // beta: v1.30 // @@ -73,5 +79,6 @@ var defaultKubernetesFeatureGates = map[Feature]FeatureSpec{ ClientsAllowCBOR: {Default: false, PreRelease: Alpha}, ClientsPreferCBOR: {Default: false, PreRelease: Alpha}, InformerResourceVersion: {Default: false, PreRelease: Alpha}, + InOrderInformers: {Default: true, PreRelease: Beta}, WatchListClient: {Default: false, PreRelease: Beta}, } diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 9ea1f494..1497700d 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -19,6 +19,7 @@ package cache import ( "context" "errors" + clientgofeaturegate "k8s.io/client-go/features" "sync" "time" @@ -592,11 +593,17 @@ func newInformer(clientState Store, options InformerOptions) Controller { // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: clientState, - EmitDeltaTypeReplaced: true, - Transformer: options.Transform, - }) + + var fifo Queue + if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { + fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform) + } else { + fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: clientState, + EmitDeltaTypeReplaced: true, + Transformer: options.Transform, + }) + } cfg := &Config{ Queue: fifo, diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index edc1c24a..05425792 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -49,10 +49,7 @@ func Example() { // This will hold incoming changes. Note how we pass downstream in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KeyFunction: MetaNamespaceKeyFunc, - KnownObjects: downstream, - }) + fifo := NewRealFIFO(MetaNamespaceKeyFunc, downstream, nil) // Let's do threadsafe output to get predictable test results. deletionCounter := make(chan string, 1000) @@ -87,7 +84,7 @@ func Example() { // fifo's KeyOf is easiest, because it handles // DeletedFinalStateUnknown markers. - key, err := fifo.KeyOf(newest.Object) + key, err := fifo.keyOf(newest.Object) if err != nil { return err } diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index c511a0ce..a8156a28 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -540,11 +540,16 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: s.indexer, - EmitDeltaTypeReplaced: true, - Transformer: s.transform, - }) + var fifo Queue + if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { + fifo = NewRealFIFO(MetaNamespaceKeyFunc, s.indexer, s.transform) + } else { + fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: s.indexer, + EmitDeltaTypeReplaced: true, + Transformer: s.transform, + }) + } cfg := &Config{ Queue: fifo,