mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-16 16:21:11 +00:00
Remove Queue.AddIfNotPresent
Logically a cache.Queue.AddIfNotPresent means that the informer can move back in time since an older item is placed after newer items. The alternative of placing errors at the head of the queue leads to indefinite memory growth and repeated failures on retry. Luckily this behavior was behind RetryOnError, which was always set to false and impossible for normal users to set to true. By removing the function and setting, impacted users (none found in a github search) will get a compile failure. Kubernetes-commit: 8e77ac000131019d5aa49c19aa1f477f6dac4d59
This commit is contained in:
parent
f29637f7f2
commit
d853ccf18c
14
tools/cache/controller.go
vendored
14
tools/cache/controller.go
vendored
@ -72,13 +72,6 @@ type Config struct {
|
||||
// resync.
|
||||
ShouldResync ShouldResyncFunc
|
||||
|
||||
// If true, when Process() returns an error, re-enqueue the object.
|
||||
// TODO: add interface to let you inject a delay/backoff or drop
|
||||
// the object completely if desired. Pass the object in
|
||||
// question to this interface as a parameter. This is probably moot
|
||||
// now that this functionality appears at a higher level.
|
||||
RetryOnError bool
|
||||
|
||||
// Called whenever the ListAndWatch drops the connection with an error.
|
||||
//
|
||||
// Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging.
|
||||
@ -213,15 +206,11 @@ func (c *controller) processLoop(ctx context.Context) {
|
||||
// TODO: Plumb through the ctx so that this can
|
||||
// actually exit when the controller is stopped. Or just give up on this stuff
|
||||
// ever being stoppable.
|
||||
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
|
||||
_, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
|
||||
if err != nil {
|
||||
if err == ErrFIFOClosed {
|
||||
return
|
||||
}
|
||||
if c.config.RetryOnError {
|
||||
// This is the safe way to re-enqueue.
|
||||
c.config.Queue.AddIfNotPresent(obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -615,7 +604,6 @@ func newInformer(clientState Store, options InformerOptions) Controller {
|
||||
ObjectType: options.ObjectType,
|
||||
FullResyncPeriod: options.ResyncPeriod,
|
||||
MinWatchTimeout: options.MinWatchTimeout,
|
||||
RetryOnError: false,
|
||||
|
||||
Process: func(obj interface{}, isInInitialList bool) error {
|
||||
if deltas, ok := obj.(Deltas); ok {
|
||||
|
1
tools/cache/controller_test.go
vendored
1
tools/cache/controller_test.go
vendored
@ -62,7 +62,6 @@ func Example() {
|
||||
ListerWatcher: source,
|
||||
ObjectType: &v1.Pod{},
|
||||
FullResyncPeriod: time.Millisecond * 100,
|
||||
RetryOnError: false,
|
||||
|
||||
// Let's implement a simple controller that just deletes
|
||||
// everything that comes in.
|
||||
|
24
tools/cache/delta_fifo.go
vendored
24
tools/cache/delta_fifo.go
vendored
@ -369,30 +369,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
|
||||
return f.queueActionLocked(Deleted, obj)
|
||||
}
|
||||
|
||||
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
|
||||
// present in the set, it is neither enqueued nor added to the set.
|
||||
//
|
||||
// This is useful in a single producer/consumer scenario so that the consumer can
|
||||
// safely retry items without contending with the producer and potentially enqueueing
|
||||
// stale items.
|
||||
//
|
||||
// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
|
||||
// different from the Add/Update/Delete functions.
|
||||
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
|
||||
deltas, ok := obj.(Deltas)
|
||||
if !ok {
|
||||
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
|
||||
}
|
||||
id, err := f.KeyOf(deltas)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.addIfNotPresent(id, deltas)
|
||||
return nil
|
||||
}
|
||||
|
||||
// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
|
||||
// already holds the fifo lock.
|
||||
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
|
||||
|
34
tools/cache/delta_fifo_test.go
vendored
34
tools/cache/delta_fifo_test.go
vendored
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
@ -881,39 +880,6 @@ func TestDeltaFIFO_detectLineJumpers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_addIfNotPresent(t *testing.T) {
|
||||
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
||||
|
||||
emptyDeltas := Deltas{}
|
||||
if err := f.AddIfNotPresent(emptyDeltas); err == nil || !errors.Is(err, ErrZeroLengthDeltasObject) {
|
||||
t.Errorf("Expected error '%v', got %v", ErrZeroLengthDeltasObject, err)
|
||||
}
|
||||
|
||||
f.Add(mkFifoObj("b", 3))
|
||||
b3 := Pop(f)
|
||||
f.Add(mkFifoObj("c", 4))
|
||||
c4 := Pop(f)
|
||||
if e, a := 0, len(f.items); e != a {
|
||||
t.Fatalf("Expected %v, got %v items in queue", e, a)
|
||||
}
|
||||
|
||||
f.Add(mkFifoObj("a", 1))
|
||||
f.Add(mkFifoObj("b", 2))
|
||||
f.AddIfNotPresent(b3)
|
||||
f.AddIfNotPresent(c4)
|
||||
|
||||
if e, a := 3, len(f.items); a != e {
|
||||
t.Fatalf("expected queue length %d, got %d", e, a)
|
||||
}
|
||||
|
||||
expectedValues := []int{1, 2, 4}
|
||||
for _, expected := range expectedValues {
|
||||
if actual := testPop(f).val; actual != expected {
|
||||
t.Fatalf("expected value %d, got %d", expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_KeyOf(t *testing.T) {
|
||||
f := DeltaFIFO{keyFunc: testFifoObjectKeyFunc}
|
||||
|
||||
|
22
tools/cache/fifo.go
vendored
22
tools/cache/fifo.go
vendored
@ -64,11 +64,6 @@ type Queue interface {
|
||||
// Pop.
|
||||
Pop(PopProcessFunc) (interface{}, error)
|
||||
|
||||
// AddIfNotPresent puts the given accumulator into the Queue (in
|
||||
// association with the accumulator's key) if and only if that key
|
||||
// is not already associated with a non-empty accumulator.
|
||||
AddIfNotPresent(interface{}) error
|
||||
|
||||
// HasSynced returns true if the first batch of keys have all been
|
||||
// popped. The first batch of keys are those of the first Replace
|
||||
// operation if that happened before any Add, AddIfNotPresent,
|
||||
@ -177,23 +172,6 @@ func (f *FIFO) Add(obj interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
|
||||
// present in the set, it is neither enqueued nor added to the set.
|
||||
//
|
||||
// This is useful in a single producer/consumer scenario so that the consumer can
|
||||
// safely retry items without contending with the producer and potentially enqueueing
|
||||
// stale items.
|
||||
func (f *FIFO) AddIfNotPresent(obj interface{}) error {
|
||||
id, err := f.keyFunc(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.addIfNotPresent(id, obj)
|
||||
return nil
|
||||
}
|
||||
|
||||
// addIfNotPresent assumes the fifo lock is already held and adds the provided
|
||||
// item to the queue under id if it does not already exist.
|
||||
func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
|
||||
|
20
tools/cache/fifo_test.go
vendored
20
tools/cache/fifo_test.go
vendored
@ -248,26 +248,6 @@ func TestFIFO_detectLineJumpers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestFIFO_addIfNotPresent(t *testing.T) {
|
||||
f := NewFIFO(testFifoObjectKeyFunc)
|
||||
|
||||
f.Add(mkFifoObj("a", 1))
|
||||
f.Add(mkFifoObj("b", 2))
|
||||
f.AddIfNotPresent(mkFifoObj("b", 3))
|
||||
f.AddIfNotPresent(mkFifoObj("c", 4))
|
||||
|
||||
if e, a := 3, len(f.items); a != e {
|
||||
t.Fatalf("expected queue length %d, got %d", e, a)
|
||||
}
|
||||
|
||||
expectedValues := []int{1, 2, 4}
|
||||
for _, expected := range expectedValues {
|
||||
if actual := Pop(f).(testFifoObject).val; actual != expected {
|
||||
t.Fatalf("expected value %d, got %d", expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFIFO_HasSynced(t *testing.T) {
|
||||
tests := []struct {
|
||||
actions []func(f *FIFO)
|
||||
|
1
tools/cache/shared_informer.go
vendored
1
tools/cache/shared_informer.go
vendored
@ -552,7 +552,6 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
|
||||
ObjectType: s.objectType,
|
||||
ObjectDescription: s.objectDescription,
|
||||
FullResyncPeriod: s.resyncCheckPeriod,
|
||||
RetryOnError: false,
|
||||
ShouldResync: s.processor.shouldResync,
|
||||
|
||||
Process: s.HandleDeltas,
|
||||
|
Loading…
Reference in New Issue
Block a user