mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
Fix line-jumping bug in FIFO implementation
Keep the FIFO's internal set in sync with the queue during Add/Update operations to prevent a queue line-jumping scenario (described in a new unit test).
This commit is contained in:
parent
893e897d9b
commit
51ec53e2af
20
pkg/client/cache/fifo.go
vendored
20
pkg/client/cache/fifo.go
vendored
@ -28,28 +28,28 @@ import (
|
|||||||
// processed once, and when it is processed, the most recent version will be
|
// processed once, and when it is processed, the most recent version will be
|
||||||
// processed. This can't be done with a channel.
|
// processed. This can't be done with a channel.
|
||||||
type FIFO struct {
|
type FIFO struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
cond sync.Cond
|
cond sync.Cond
|
||||||
|
// We depend on the property that items in the set are in the queue and vice versa.
|
||||||
items map[string]interface{}
|
items map[string]interface{}
|
||||||
queue []string
|
queue []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add inserts an item, and puts it in the queue.
|
// Add inserts an item, and puts it in the queue. The item is only enqueued
|
||||||
|
// if it doesn't already exist in the set.
|
||||||
func (f *FIFO) Add(id string, obj interface{}) {
|
func (f *FIFO) Add(id string, obj interface{}) {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
if _, exists := f.items[id]; !exists {
|
||||||
|
f.queue = append(f.queue, id)
|
||||||
|
}
|
||||||
f.items[id] = obj
|
f.items[id] = obj
|
||||||
f.queue = append(f.queue, id)
|
|
||||||
f.cond.Broadcast()
|
f.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update updates an item, and adds it to the queue.
|
// Update is the same as Add in this implementation.
|
||||||
func (f *FIFO) Update(id string, obj interface{}) {
|
func (f *FIFO) Update(id string, obj interface{}) {
|
||||||
f.lock.Lock()
|
f.Add(id, obj)
|
||||||
defer f.lock.Unlock()
|
|
||||||
f.items[id] = obj
|
|
||||||
f.queue = append(f.queue, id)
|
|
||||||
f.cond.Broadcast()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete removes an item. It doesn't add it to the queue, because
|
// Delete removes an item. It doesn't add it to the queue, because
|
||||||
|
28
pkg/client/cache/fifo_test.go
vendored
28
pkg/client/cache/fifo_test.go
vendored
@ -107,3 +107,31 @@ func TestFIFO_addReplace(t *testing.T) {
|
|||||||
t.Errorf("item did not get removed")
|
t.Errorf("item did not get removed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFIFO_detectLineJumpers(t *testing.T) {
|
||||||
|
f := NewFIFO()
|
||||||
|
|
||||||
|
f.Add("foo", 10)
|
||||||
|
f.Add("bar", 1)
|
||||||
|
f.Add("foo", 11)
|
||||||
|
f.Add("foo", 13)
|
||||||
|
f.Add("zab", 30)
|
||||||
|
|
||||||
|
if e, a := 13, f.Pop().(int); a != e {
|
||||||
|
t.Fatalf("expected %d, got %d", e, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
f.Add("foo", 14) // ensure foo doesn't jump back in line
|
||||||
|
|
||||||
|
if e, a := 1, f.Pop().(int); a != e {
|
||||||
|
t.Fatalf("expected %d, got %d", e, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, a := 30, f.Pop().(int); a != e {
|
||||||
|
t.Fatalf("expected %d, got %d", e, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, a := 14, f.Pop().(int); a != e {
|
||||||
|
t.Fatalf("expected %d, got %d", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user