From 45372a90f836f1455e5f7f983ff4894f487d46e9 Mon Sep 17 00:00:00 2001 From: Dan Mace Date: Thu, 19 Feb 2015 16:58:37 -0500 Subject: [PATCH] Support AddIfNotPresent function Add an AddIfNotPresent function to support single producer/consumer retry scenarios. Provides the consumer with a means to prefer items already enqueued by the producer at the point of retry. --- pkg/client/cache/fifo.go | 23 +++++++++++++++++++++++ pkg/client/cache/fifo_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 42c0c42f76b..7ff644ad0d2 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -54,6 +54,29 @@ func (f *FIFO) Add(obj interface{}) error { return nil } +// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already +// present in the set, it is neither enqueued nor added to the set. +// +// This is useful in a single producer/consumer scenario so that the consumer can +// safely retry items without contending with the producer and potentially enqueueing +// stale items. +func (f *FIFO) AddIfNotPresent(obj interface{}) error { + id, err := f.keyFunc(obj) + if err != nil { + return fmt.Errorf("couldn't create key for object: %v", err) + } + f.lock.Lock() + defer f.lock.Unlock() + if _, exists := f.items[id]; exists { + return nil + } + + f.queue = append(f.queue, id) + f.items[id] = obj + f.cond.Broadcast() + return nil +} + // Update is the same as Add in this implementation. func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) diff --git a/pkg/client/cache/fifo_test.go b/pkg/client/cache/fifo_test.go index 1ba6f07e094..f96ea14849b 100644 --- a/pkg/client/cache/fifo_test.go +++ b/pkg/client/cache/fifo_test.go @@ -160,3 +160,27 @@ func TestFIFO_detectLineJumpers(t *testing.T) { t.Fatalf("expected %d, got %d", e, a) } } + +func TestFIFO_addIfNotPresent(t *testing.T) { + mkObj := func(name string, val interface{}) testFifoObject { + return testFifoObject{name: name, val: val} + } + + f := NewFIFO(testFifoObjectKeyFunc) + + f.Add(mkObj("a", 1)) + f.Add(mkObj("b", 2)) + f.AddIfNotPresent(mkObj("b", 3)) + f.AddIfNotPresent(mkObj("c", 4)) + + if e, a := 3, len(f.items); a != e { + t.Fatalf("expected queue length %d, got %d", e, a) + } + + expectedValues := []int{1, 2, 4} + for _, expected := range expectedValues { + if actual := f.Pop().(testFifoObject).val; actual != expected { + t.Fatalf("expected value %d, got %d", expected, actual) + } + } +}