From 2132ecc28d3cd5c026fb2410a9ccf4f21e199ebe Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 10 Jul 2016 17:24:00 -0400 Subject: [PATCH] Allow a FIFO client to requeue under lock The Pop method should allow a caller to requeue an item while under the fifo lock, to avoid races on deletes. --- pkg/client/cache/delta_fifo.go | 21 +++++++++++--- pkg/client/cache/delta_fifo_test.go | 45 +++++++++++++++++++++++++++++ pkg/client/cache/fifo.go | 33 +++++++++++++++++++-- pkg/client/cache/fifo_test.go | 45 +++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 7 deletions(-) diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index 5872ad3081e..5a7f4a90f62 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -228,15 +228,21 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { } f.lock.Lock() defer f.lock.Unlock() + f.addIfNotPresent(id, deltas) + return nil +} + +// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller +// already holds the fifo lock. +func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { f.populated = true if _, exists := f.items[id]; exists { - return nil + return } f.queue = append(f.queue, id) f.items[id] = deltas f.cond.Broadcast() - return nil } // re-listing and watching can deliver the same update multiple times in any @@ -387,7 +393,9 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). // process function is called under lock, so it is safe update data structures -// in it that need to be in sync with the queue (e.g. knownKeys). +// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc +// may return an instance of ErrRequeue with a nested error to indicate the current +// item should be requeued (equivalent to calling AddIfNotPresent under the lock). // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. @@ -409,9 +417,14 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { continue } delete(f.items, id) + err := process(item) + if e, ok := err.(ErrRequeue); ok { + f.addIfNotPresent(id, item) + err = e.Err + } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. - return item, process(item) + return item, err } } diff --git a/pkg/client/cache/delta_fifo_test.go b/pkg/client/cache/delta_fifo_test.go index d3dd1ddbfa5..3d087a4b0e2 100644 --- a/pkg/client/cache/delta_fifo_test.go +++ b/pkg/client/cache/delta_fifo_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "fmt" "reflect" "testing" "time" @@ -84,6 +85,50 @@ func TestDeltaFIFO_basic(t *testing.T) { } } +func TestDeltaFIFO_requeueOnPop(t *testing.T) { + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + + f.Add(mkFifoObj("foo", 10)) + _, err := f.Pop(func(obj interface{}) error { + if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { + t.Fatalf("unexpected object: %#v", obj) + } + return ErrRequeue{Err: nil} + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, ok, err := f.GetByKey("foo"); !ok || err != nil { + t.Fatalf("object should have been requeued: %t %v", ok, err) + } + + _, err = f.Pop(func(obj interface{}) error { + if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { + t.Fatalf("unexpected object: %#v", obj) + } + return ErrRequeue{Err: fmt.Errorf("test error")} + }) + if err == nil || err.Error() != "test error" { + t.Fatalf("unexpected error: %v", err) + } + if _, ok, err := f.GetByKey("foo"); !ok || err != nil { + t.Fatalf("object should have been requeued: %t %v", ok, err) + } + + _, err = f.Pop(func(obj interface{}) error { + if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { + t.Fatalf("unexpected object: %#v", obj) + } + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, ok, err := f.GetByKey("foo"); ok || err != nil { + t.Fatalf("object should have been removed: %t %v", ok, err) + } +} + func TestDeltaFIFO_compressorWorks(t *testing.T) { oldestTypes := []DeltaType{} f := NewDeltaFIFO( diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 50c7b228436..a6d5e0ab839 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -26,12 +26,28 @@ import ( // It is supposed to process the element popped from the queue. type PopProcessFunc func(interface{}) error +// ErrRequeue may be returned by a PopProcessFunc to safely requeue +// the current item. The value of Err will be returned from Pop. +type ErrRequeue struct { + // Err is returned by the Pop function + Err error +} + +func (e ErrRequeue) Error() string { + if e.Err == nil { + return "the popped item should be requeued without returning an error" + } + return e.Err.Error() +} + // Queue is exactly like a Store, but has a Pop() method too. type Queue interface { Store // Pop blocks until it has something to process. // It returns the object that was process and the result of processing. + // The PopProcessFunc may return an ErrRequeue{...} to indicate the item + // should be requeued before releasing the lock on the queue. Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent adds a value previously @@ -129,15 +145,21 @@ func (f *FIFO) AddIfNotPresent(obj interface{}) error { } f.lock.Lock() defer f.lock.Unlock() + f.addIfNotPresent(id, obj) + return nil +} + +// addIfNotPresent assumes the fifo lock is already held and adds the the provided +// item to the queue under id if it does not already exist. +func (f *FIFO) addIfNotPresent(id string, obj interface{}) { f.populated = true if _, exists := f.items[id]; exists { - return nil + return } f.queue = append(f.queue, id) f.items[id] = obj f.cond.Broadcast() - return nil } // Update is the same as Add in this implementation. @@ -224,7 +246,12 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { continue } delete(f.items, id) - return item, process(item) + err := process(item) + if e, ok := err.(ErrRequeue); ok { + f.addIfNotPresent(id, item) + err = e.Err + } + return item, err } } diff --git a/pkg/client/cache/fifo_test.go b/pkg/client/cache/fifo_test.go index 14077dbe754..afd311d7880 100644 --- a/pkg/client/cache/fifo_test.go +++ b/pkg/client/cache/fifo_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "fmt" "reflect" "testing" "time" @@ -70,6 +71,50 @@ func TestFIFO_basic(t *testing.T) { } } +func TestFIFO_requeueOnPop(t *testing.T) { + f := NewFIFO(testFifoObjectKeyFunc) + + f.Add(mkFifoObj("foo", 10)) + _, err := f.Pop(func(obj interface{}) error { + if obj.(testFifoObject).name != "foo" { + t.Fatalf("unexpected object: %#v", obj) + } + return ErrRequeue{Err: nil} + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, ok, err := f.GetByKey("foo"); !ok || err != nil { + t.Fatalf("object should have been requeued: %t %v", ok, err) + } + + _, err = f.Pop(func(obj interface{}) error { + if obj.(testFifoObject).name != "foo" { + t.Fatalf("unexpected object: %#v", obj) + } + return ErrRequeue{Err: fmt.Errorf("test error")} + }) + if err == nil || err.Error() != "test error" { + t.Fatalf("unexpected error: %v", err) + } + if _, ok, err := f.GetByKey("foo"); !ok || err != nil { + t.Fatalf("object should have been requeued: %t %v", ok, err) + } + + _, err = f.Pop(func(obj interface{}) error { + if obj.(testFifoObject).name != "foo" { + t.Fatalf("unexpected object: %#v", obj) + } + return nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if _, ok, err := f.GetByKey("foo"); ok || err != nil { + t.Fatalf("object should have been removed: %t %v", ok, err) + } +} + func TestFIFO_addUpdate(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) f.Add(mkFifoObj("foo", 10))