diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 41ead09e..9ea1f494 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -72,13 +72,6 @@ type Config struct { // resync. ShouldResync ShouldResyncFunc - // If true, when Process() returns an error, re-enqueue the object. - // TODO: add interface to let you inject a delay/backoff or drop - // the object completely if desired. Pass the object in - // question to this interface as a parameter. This is probably moot - // now that this functionality appears at a higher level. - RetryOnError bool - // Called whenever the ListAndWatch drops the connection with an error. // // Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging. @@ -213,15 +206,11 @@ func (c *controller) processLoop(ctx context.Context) { // TODO: Plumb through the ctx so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. - obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) + _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } - if c.config.RetryOnError { - // This is the safe way to re-enqueue. - c.config.Queue.AddIfNotPresent(obj) - } } } } @@ -615,7 +604,6 @@ func newInformer(clientState Store, options InformerOptions) Controller { ObjectType: options.ObjectType, FullResyncPeriod: options.ResyncPeriod, MinWatchTimeout: options.MinWatchTimeout, - RetryOnError: false, Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index dba2dfe6..edc1c24a 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -62,7 +62,6 @@ func Example() { ListerWatcher: source, ObjectType: &v1.Pod{}, FullResyncPeriod: time.Millisecond * 100, - RetryOnError: false, // Let's implement a simple controller that just deletes // everything that comes in. diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 99ef6a01..ab1af552 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -369,30 +369,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { return f.queueActionLocked(Deleted, obj) } -// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already -// present in the set, it is neither enqueued nor added to the set. -// -// This is useful in a single producer/consumer scenario so that the consumer can -// safely retry items without contending with the producer and potentially enqueueing -// stale items. -// -// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is -// different from the Add/Update/Delete functions. -func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { - deltas, ok := obj.(Deltas) - if !ok { - return fmt.Errorf("object must be of type deltas, but got: %#v", obj) - } - id, err := f.KeyOf(deltas) - if err != nil { - return KeyError{obj, err} - } - f.lock.Lock() - defer f.lock.Unlock() - f.addIfNotPresent(id, deltas) - return nil -} - // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller // already holds the fifo lock. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 12e67716..477574ee 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "errors" "fmt" "reflect" "runtime" @@ -881,39 +880,6 @@ func TestDeltaFIFO_detectLineJumpers(t *testing.T) { } } -func TestDeltaFIFO_addIfNotPresent(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) - - emptyDeltas := Deltas{} - if err := f.AddIfNotPresent(emptyDeltas); err == nil || !errors.Is(err, ErrZeroLengthDeltasObject) { - t.Errorf("Expected error '%v', got %v", ErrZeroLengthDeltasObject, err) - } - - f.Add(mkFifoObj("b", 3)) - b3 := Pop(f) - f.Add(mkFifoObj("c", 4)) - c4 := Pop(f) - if e, a := 0, len(f.items); e != a { - t.Fatalf("Expected %v, got %v items in queue", e, a) - } - - f.Add(mkFifoObj("a", 1)) - f.Add(mkFifoObj("b", 2)) - f.AddIfNotPresent(b3) - f.AddIfNotPresent(c4) - - if e, a := 3, len(f.items); a != e { - t.Fatalf("expected queue length %d, got %d", e, a) - } - - expectedValues := []int{1, 2, 4} - for _, expected := range expectedValues { - if actual := testPop(f).val; actual != expected { - t.Fatalf("expected value %d, got %d", expected, actual) - } - } -} - func TestDeltaFIFO_KeyOf(t *testing.T) { f := DeltaFIFO{keyFunc: testFifoObjectKeyFunc} diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 9505f5ab..6df5c80f 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -64,11 +64,6 @@ type Queue interface { // Pop. Pop(PopProcessFunc) (interface{}, error) - // AddIfNotPresent puts the given accumulator into the Queue (in - // association with the accumulator's key) if and only if that key - // is not already associated with a non-empty accumulator. - AddIfNotPresent(interface{}) error - // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace // operation if that happened before any Add, AddIfNotPresent, @@ -177,23 +172,6 @@ func (f *FIFO) Add(obj interface{}) error { return nil } -// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already -// present in the set, it is neither enqueued nor added to the set. -// -// This is useful in a single producer/consumer scenario so that the consumer can -// safely retry items without contending with the producer and potentially enqueueing -// stale items. -func (f *FIFO) AddIfNotPresent(obj interface{}) error { - id, err := f.keyFunc(obj) - if err != nil { - return KeyError{obj, err} - } - f.lock.Lock() - defer f.lock.Unlock() - f.addIfNotPresent(id, obj) - return nil -} - // addIfNotPresent assumes the fifo lock is already held and adds the provided // item to the queue under id if it does not already exist. func (f *FIFO) addIfNotPresent(id string, obj interface{}) { diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index ff863a60..1f2428fb 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -248,26 +248,6 @@ func TestFIFO_detectLineJumpers(t *testing.T) { } } -func TestFIFO_addIfNotPresent(t *testing.T) { - f := NewFIFO(testFifoObjectKeyFunc) - - f.Add(mkFifoObj("a", 1)) - f.Add(mkFifoObj("b", 2)) - f.AddIfNotPresent(mkFifoObj("b", 3)) - f.AddIfNotPresent(mkFifoObj("c", 4)) - - if e, a := 3, len(f.items); a != e { - t.Fatalf("expected queue length %d, got %d", e, a) - } - - expectedValues := []int{1, 2, 4} - for _, expected := range expectedValues { - if actual := Pop(f).(testFifoObject).val; actual != expected { - t.Fatalf("expected value %d, got %d", expected, actual) - } - } -} - func TestFIFO_HasSynced(t *testing.T) { tests := []struct { actions []func(f *FIFO) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 49dd1edc..c511a0ce 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -552,7 +552,6 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { ObjectType: s.objectType, ObjectDescription: s.objectDescription, FullResyncPeriod: s.resyncCheckPeriod, - RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas,