mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Support AddIfNotPresent function
Add an AddIfNotPresent function to support single producer/consumer retry scenarios. Provides the consumer with a means to prefer items already enqueued by the producer at the point of retry.
This commit is contained in:
parent
c9657cad04
commit
45372a90f8
23
pkg/client/cache/fifo.go
vendored
23
pkg/client/cache/fifo.go
vendored
@ -54,6 +54,29 @@ func (f *FIFO) Add(obj interface{}) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
|
||||||
|
// present in the set, it is neither enqueued nor added to the set.
|
||||||
|
//
|
||||||
|
// This is useful in a single producer/consumer scenario so that the consumer can
|
||||||
|
// safely retry items without contending with the producer and potentially enqueueing
|
||||||
|
// stale items.
|
||||||
|
func (f *FIFO) AddIfNotPresent(obj interface{}) error {
|
||||||
|
id, err := f.keyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("couldn't create key for object: %v", err)
|
||||||
|
}
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
if _, exists := f.items[id]; exists {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
f.queue = append(f.queue, id)
|
||||||
|
f.items[id] = obj
|
||||||
|
f.cond.Broadcast()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Update is the same as Add in this implementation.
|
// Update is the same as Add in this implementation.
|
||||||
func (f *FIFO) Update(obj interface{}) error {
|
func (f *FIFO) Update(obj interface{}) error {
|
||||||
return f.Add(obj)
|
return f.Add(obj)
|
||||||
|
24
pkg/client/cache/fifo_test.go
vendored
24
pkg/client/cache/fifo_test.go
vendored
@ -160,3 +160,27 @@ func TestFIFO_detectLineJumpers(t *testing.T) {
|
|||||||
t.Fatalf("expected %d, got %d", e, a)
|
t.Fatalf("expected %d, got %d", e, a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFIFO_addIfNotPresent(t *testing.T) {
|
||||||
|
mkObj := func(name string, val interface{}) testFifoObject {
|
||||||
|
return testFifoObject{name: name, val: val}
|
||||||
|
}
|
||||||
|
|
||||||
|
f := NewFIFO(testFifoObjectKeyFunc)
|
||||||
|
|
||||||
|
f.Add(mkObj("a", 1))
|
||||||
|
f.Add(mkObj("b", 2))
|
||||||
|
f.AddIfNotPresent(mkObj("b", 3))
|
||||||
|
f.AddIfNotPresent(mkObj("c", 4))
|
||||||
|
|
||||||
|
if e, a := 3, len(f.items); a != e {
|
||||||
|
t.Fatalf("expected queue length %d, got %d", e, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedValues := []int{1, 2, 4}
|
||||||
|
for _, expected := range expectedValues {
|
||||||
|
if actual := f.Pop().(testFifoObject).val; actual != expected {
|
||||||
|
t.Fatalf("expected value %d, got %d", expected, actual)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user