From d853ccf18cc9ec7fff0c03f09150195eb7a30ed1 Mon Sep 17 00:00:00 2001 From: David Eads Date: Mon, 20 Jan 2025 13:54:32 -0500 Subject: [PATCH] Remove Queue.AddIfNotPresent Logically a cache.Queue.AddIfNotPresent means that the informer can move back in time since an older item is placed after newer items. The alternative of placing errors at the head of the queue leads to indefinite memory growth and repeated failures on retry. Luckily this behavior was behind RetryOnError, which was always set to false and impossible for normal users to set to true. By removing the function and setting, impacted users (none found in a github search) will get a compile failure. Kubernetes-commit: 8e77ac000131019d5aa49c19aa1f477f6dac4d59 --- tools/cache/controller.go | 14 +------------- tools/cache/controller_test.go | 1 - tools/cache/delta_fifo.go | 24 ------------------------ tools/cache/delta_fifo_test.go | 34 ---------------------------------- tools/cache/fifo.go | 22 ---------------------- tools/cache/fifo_test.go | 20 -------------------- tools/cache/shared_informer.go | 1 - 7 files changed, 1 insertion(+), 115 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 41ead09e..9ea1f494 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -72,13 +72,6 @@ type Config struct { // resync. ShouldResync ShouldResyncFunc - // If true, when Process() returns an error, re-enqueue the object. - // TODO: add interface to let you inject a delay/backoff or drop - // the object completely if desired. Pass the object in - // question to this interface as a parameter. This is probably moot - // now that this functionality appears at a higher level. - RetryOnError bool - // Called whenever the ListAndWatch drops the connection with an error. // // Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging. @@ -213,15 +206,11 @@ func (c *controller) processLoop(ctx context.Context) { // TODO: Plumb through the ctx so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. - obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) + _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } - if c.config.RetryOnError { - // This is the safe way to re-enqueue. - c.config.Queue.AddIfNotPresent(obj) - } } } } @@ -615,7 +604,6 @@ func newInformer(clientState Store, options InformerOptions) Controller { ObjectType: options.ObjectType, FullResyncPeriod: options.ResyncPeriod, MinWatchTimeout: options.MinWatchTimeout, - RetryOnError: false, Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index dba2dfe6..edc1c24a 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -62,7 +62,6 @@ func Example() { ListerWatcher: source, ObjectType: &v1.Pod{}, FullResyncPeriod: time.Millisecond * 100, - RetryOnError: false, // Let's implement a simple controller that just deletes // everything that comes in. diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 99ef6a01..ab1af552 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -369,30 +369,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { return f.queueActionLocked(Deleted, obj) } -// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already -// present in the set, it is neither enqueued nor added to the set. -// -// This is useful in a single producer/consumer scenario so that the consumer can -// safely retry items without contending with the producer and potentially enqueueing -// stale items. -// -// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is -// different from the Add/Update/Delete functions. -func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { - deltas, ok := obj.(Deltas) - if !ok { - return fmt.Errorf("object must be of type deltas, but got: %#v", obj) - } - id, err := f.KeyOf(deltas) - if err != nil { - return KeyError{obj, err} - } - f.lock.Lock() - defer f.lock.Unlock() - f.addIfNotPresent(id, deltas) - return nil -} - // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller // already holds the fifo lock. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 12e67716..477574ee 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "errors" "fmt" "reflect" "runtime" @@ -881,39 +880,6 @@ func TestDeltaFIFO_detectLineJumpers(t *testing.T) { } } -func TestDeltaFIFO_addIfNotPresent(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) - - emptyDeltas := Deltas{} - if err := f.AddIfNotPresent(emptyDeltas); err == nil || !errors.Is(err, ErrZeroLengthDeltasObject) { - t.Errorf("Expected error '%v', got %v", ErrZeroLengthDeltasObject, err) - } - - f.Add(mkFifoObj("b", 3)) - b3 := Pop(f) - f.Add(mkFifoObj("c", 4)) - c4 := Pop(f) - if e, a := 0, len(f.items); e != a { - t.Fatalf("Expected %v, got %v items in queue", e, a) - } - - f.Add(mkFifoObj("a", 1)) - f.Add(mkFifoObj("b", 2)) - f.AddIfNotPresent(b3) - f.AddIfNotPresent(c4) - - if e, a := 3, len(f.items); a != e { - t.Fatalf("expected queue length %d, got %d", e, a) - } - - expectedValues := []int{1, 2, 4} - for _, expected := range expectedValues { - if actual := testPop(f).val; actual != expected { - t.Fatalf("expected value %d, got %d", expected, actual) - } - } -} - func TestDeltaFIFO_KeyOf(t *testing.T) { f := DeltaFIFO{keyFunc: testFifoObjectKeyFunc} diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 9505f5ab..6df5c80f 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -64,11 +64,6 @@ type Queue interface { // Pop. Pop(PopProcessFunc) (interface{}, error) - // AddIfNotPresent puts the given accumulator into the Queue (in - // association with the accumulator's key) if and only if that key - // is not already associated with a non-empty accumulator. - AddIfNotPresent(interface{}) error - // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace // operation if that happened before any Add, AddIfNotPresent, @@ -177,23 +172,6 @@ func (f *FIFO) Add(obj interface{}) error { return nil } -// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already -// present in the set, it is neither enqueued nor added to the set. -// -// This is useful in a single producer/consumer scenario so that the consumer can -// safely retry items without contending with the producer and potentially enqueueing -// stale items. -func (f *FIFO) AddIfNotPresent(obj interface{}) error { - id, err := f.keyFunc(obj) - if err != nil { - return KeyError{obj, err} - } - f.lock.Lock() - defer f.lock.Unlock() - f.addIfNotPresent(id, obj) - return nil -} - // addIfNotPresent assumes the fifo lock is already held and adds the provided // item to the queue under id if it does not already exist. func (f *FIFO) addIfNotPresent(id string, obj interface{}) { diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index ff863a60..1f2428fb 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -248,26 +248,6 @@ func TestFIFO_detectLineJumpers(t *testing.T) { } } -func TestFIFO_addIfNotPresent(t *testing.T) { - f := NewFIFO(testFifoObjectKeyFunc) - - f.Add(mkFifoObj("a", 1)) - f.Add(mkFifoObj("b", 2)) - f.AddIfNotPresent(mkFifoObj("b", 3)) - f.AddIfNotPresent(mkFifoObj("c", 4)) - - if e, a := 3, len(f.items); a != e { - t.Fatalf("expected queue length %d, got %d", e, a) - } - - expectedValues := []int{1, 2, 4} - for _, expected := range expectedValues { - if actual := Pop(f).(testFifoObject).val; actual != expected { - t.Fatalf("expected value %d, got %d", expected, actual) - } - } -} - func TestFIFO_HasSynced(t *testing.T) { tests := []struct { actions []func(f *FIFO) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 49dd1edc..c511a0ce 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -552,7 +552,6 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { ObjectType: s.objectType, ObjectDescription: s.objectDescription, FullResyncPeriod: s.resyncCheckPeriod, - RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas,