diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index ab1af552..264d7559 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -369,19 +369,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { return f.queueActionLocked(Deleted, obj) } -// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller -// already holds the fifo lock. -func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { - f.populated = true - if _, exists := f.items[id]; exists { - return - } - - f.queue = append(f.queue, id) - f.items[id] = deltas - f.cond.Broadcast() -} - // re-listing and watching can deliver the same update multiple times in any // order. This will combine the most recent two deltas if they are the same. func dedupDeltas(deltas Deltas) Deltas { @@ -497,9 +484,7 @@ func (f *DeltaFIFO) IsClosed() bool { // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). // process function is called under lock, so it is safe to update data structures -// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc -// may return an instance of ErrRequeue with a nested error to indicate the current -// item should be requeued (equivalent to calling AddIfNotPresent under the lock). +// in it that need to be in sync with the queue (e.g. knownKeys). // process should avoid expensive I/O operation so that other queue operations, i.e. // Add() and Get(), won't be blocked for too long. // @@ -546,10 +531,6 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { defer trace.LogIfLong(100 * time.Millisecond) } err := process(item, isInInitialList) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 477574ee..8f069eb1 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -304,50 +304,6 @@ func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { } } -func TestDeltaFIFO_requeueOnPop(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) - - f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: nil} - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: fmt.Errorf("test error")} - }) - if err == nil || err.Error() != "test error" { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return nil - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); ok || err != nil { - t.Fatalf("object should have been removed: %t %v", ok, err) - } -} - func TestDeltaFIFO_addUpdate(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("foo", 10)) diff --git a/tools/cache/fifo.go b/tools/cache/fifo.go index 6df5c80f..5c2ca900 100644 --- a/tools/cache/fifo.go +++ b/tools/cache/fifo.go @@ -27,23 +27,9 @@ import ( // It is supposed to process the accumulator popped from the queue. type PopProcessFunc func(obj interface{}, isInInitialList bool) error -// ErrRequeue may be returned by a PopProcessFunc to safely requeue -// the current item. The value of Err will be returned from Pop. -type ErrRequeue struct { - // Err is returned by the Pop function - Err error -} - // ErrFIFOClosed used when FIFO is closed var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") -func (e ErrRequeue) Error() string { - if e.Err == nil { - return "the popped item should be requeued without returning an error" - } - return e.Err.Error() -} - // Queue extends ReflectorStore with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. @@ -172,19 +158,6 @@ func (f *FIFO) Add(obj interface{}) error { return nil } -// addIfNotPresent assumes the fifo lock is already held and adds the provided -// item to the queue under id if it does not already exist. -func (f *FIFO) addIfNotPresent(id string, obj interface{}) { - f.populated = true - if _, exists := f.items[id]; exists { - return - } - - f.queue = append(f.queue, id) - f.items[id] = obj - f.cond.Broadcast() -} - // Update is the same as Add in this implementation. func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) @@ -245,10 +218,6 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { } delete(f.items, id) err := process(item, isInInitialList) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } return item, err } } diff --git a/tools/cache/fifo_test.go b/tools/cache/fifo_test.go index 1f2428fb..1831889b 100644 --- a/tools/cache/fifo_test.go +++ b/tools/cache/fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "fmt" "reflect" "runtime" "testing" @@ -116,50 +115,6 @@ func TestFIFO_basic(t *testing.T) { } } -func TestFIFO_requeueOnPop(t *testing.T) { - f := NewFIFO(testFifoObjectKeyFunc) - - f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: nil} - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: fmt.Errorf("test error")} - }) - if err == nil || err.Error() != "test error" { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return nil - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); ok || err != nil { - t.Fatalf("object should have been removed: %t %v", ok, err) - } -} - func TestFIFO_addUpdate(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) f.Add(mkFifoObj("foo", 10))