mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #27435 from wojtek-t/fix_delta_fifo_race
Fix bugs in DeltaFIFO
This commit is contained in:
commit
8bc4e9bc13
25
pkg/client/cache/delta_fifo.go
vendored
25
pkg/client/cache/delta_fifo.go
vendored
@ -189,13 +189,21 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
|
|||||||
// Don't provide a second report of the same deletion.
|
// Don't provide a second report of the same deletion.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
} else if _, exists, err := f.knownObjects.GetByKey(id); err == nil && !exists {
|
} else {
|
||||||
|
// We only want to skip the "deletion" action if the object doesn't
|
||||||
|
// exist in knownObjects and it doesn't have corresponding item in items.
|
||||||
|
// Note that even if there is a "deletion" action in items, we can ignore it,
|
||||||
|
// because it will be deduped automatically in "queueActionLocked"
|
||||||
|
_, exists, err := f.knownObjects.GetByKey(id)
|
||||||
|
_, itemsExist := f.items[id]
|
||||||
|
if err == nil && !exists && !itemsExist {
|
||||||
// Presumably, this was deleted when a relist happened.
|
// Presumably, this was deleted when a relist happened.
|
||||||
// Don't provide a second report of the same deletion.
|
// Don't provide a second report of the same deletion.
|
||||||
// TODO(lavalamp): This may be racy-- we aren't properly locked
|
// TODO(lavalamp): This may be racy-- we aren't properly locked
|
||||||
// with knownObjects.
|
// with knownObjects.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return f.queueActionLocked(Deleted, obj)
|
return f.queueActionLocked(Deleted, obj)
|
||||||
}
|
}
|
||||||
@ -270,6 +278,13 @@ func isDeletionDup(a, b *Delta) *Delta {
|
|||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// willObjectBeDeletedLocked returns true only if the last delta for the
|
||||||
|
// given object is Delete. Caller must lock first.
|
||||||
|
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
|
||||||
|
deltas := f.items[id]
|
||||||
|
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
|
||||||
|
}
|
||||||
|
|
||||||
// queueActionLocked appends to the delta list for the object, calling
|
// queueActionLocked appends to the delta list for the object, calling
|
||||||
// f.deltaCompressor if needed. Caller must lock first.
|
// f.deltaCompressor if needed. Caller must lock first.
|
||||||
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
|
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
|
||||||
@ -277,6 +292,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return KeyError{obj, err}
|
return KeyError{obj, err}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If object is supposed to be deleted (last event is Deleted),
|
||||||
|
// then we should ignore Sync events, because it would result in
|
||||||
|
// recreation of this object.
|
||||||
|
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
newDeltas := append(f.items[id], Delta{actionType, obj})
|
newDeltas := append(f.items[id], Delta{actionType, obj})
|
||||||
newDeltas = dedupDeltas(newDeltas)
|
newDeltas = dedupDeltas(newDeltas)
|
||||||
if f.deltaCompressor != nil {
|
if f.deltaCompressor != nil {
|
||||||
|
62
pkg/client/cache/delta_fifo_test.go
vendored
62
pkg/client/cache/delta_fifo_test.go
vendored
@ -28,18 +28,22 @@ func testPop(f *DeltaFIFO) testFifoObject {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// keyLookupFunc adapts a raw function to be a KeyLookup.
|
// keyLookupFunc adapts a raw function to be a KeyLookup.
|
||||||
type keyLookupFunc func() []string
|
type keyLookupFunc func() []testFifoObject
|
||||||
|
|
||||||
// ListKeys just calls kl.
|
// ListKeys just calls kl.
|
||||||
func (kl keyLookupFunc) ListKeys() []string {
|
func (kl keyLookupFunc) ListKeys() []string {
|
||||||
return kl()
|
result := []string{}
|
||||||
|
for _, fifoObj := range kl() {
|
||||||
|
result = append(result, fifoObj.name)
|
||||||
|
}
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetByKey returns the key if it exists in the list returned by kl.
|
// GetByKey returns the key if it exists in the list returned by kl.
|
||||||
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
|
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
|
||||||
for _, v := range kl() {
|
for _, v := range kl() {
|
||||||
if v == key {
|
if v.name == key {
|
||||||
return key, true, nil
|
return v, true, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
@ -173,8 +177,8 @@ func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
|
|||||||
f := NewDeltaFIFO(
|
f := NewDeltaFIFO(
|
||||||
testFifoObjectKeyFunc,
|
testFifoObjectKeyFunc,
|
||||||
nil,
|
nil,
|
||||||
keyLookupFunc(func() []string {
|
keyLookupFunc(func() []testFifoObject {
|
||||||
return []string{"foo", "bar", "baz"}
|
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
f.Add(mkFifoObj("foo", 10))
|
f.Add(mkFifoObj("foo", 10))
|
||||||
@ -220,12 +224,52 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
|
||||||
|
f := NewDeltaFIFO(
|
||||||
|
testFifoObjectKeyFunc,
|
||||||
|
nil,
|
||||||
|
keyLookupFunc(func() []testFifoObject {
|
||||||
|
return []testFifoObject{mkFifoObj("foo", 5)}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
f.Delete(mkFifoObj("foo", 10))
|
||||||
|
f.Resync()
|
||||||
|
|
||||||
|
deltas := f.items["foo"]
|
||||||
|
if len(deltas) != 1 {
|
||||||
|
t.Fatalf("unexpected deltas length: %v", deltas)
|
||||||
|
}
|
||||||
|
if deltas[0].Type != Deleted {
|
||||||
|
t.Errorf("unexpected delta: %v", deltas[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
|
||||||
|
f := NewDeltaFIFO(
|
||||||
|
testFifoObjectKeyFunc,
|
||||||
|
nil,
|
||||||
|
keyLookupFunc(func() []testFifoObject {
|
||||||
|
return []testFifoObject{}
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
f.Add(mkFifoObj("foo", 5))
|
||||||
|
f.Delete(mkFifoObj("foo", 6))
|
||||||
|
|
||||||
|
deltas := f.items["foo"]
|
||||||
|
if len(deltas) != 2 {
|
||||||
|
t.Fatalf("unexpected deltas length: %v", deltas)
|
||||||
|
}
|
||||||
|
if deltas[len(deltas)-1].Type != Deleted {
|
||||||
|
t.Errorf("unexpected delta: %v", deltas[len(deltas)-1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
||||||
f := NewDeltaFIFO(
|
f := NewDeltaFIFO(
|
||||||
testFifoObjectKeyFunc,
|
testFifoObjectKeyFunc,
|
||||||
nil,
|
nil,
|
||||||
keyLookupFunc(func() []string {
|
keyLookupFunc(func() []testFifoObject {
|
||||||
return []string{"foo", "bar", "baz"}
|
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
f.Delete(mkFifoObj("baz", 10))
|
f.Delete(mkFifoObj("baz", 10))
|
||||||
@ -236,7 +280,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
|||||||
{{Sync, mkFifoObj("foo", 5)}},
|
{{Sync, mkFifoObj("foo", 5)}},
|
||||||
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
||||||
// it should get a tombstone key with the right Obj.
|
// it should get a tombstone key with the right Obj.
|
||||||
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: "bar"}}},
|
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, expected := range expectedList {
|
for _, expected := range expectedList {
|
||||||
|
2
pkg/client/cache/fifo.go
vendored
2
pkg/client/cache/fifo.go
vendored
@ -45,6 +45,8 @@ type Queue interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Helper function for popping from Queue.
|
// Helper function for popping from Queue.
|
||||||
|
// WARNING: Do NOT use this function in non-test code to avoid races
|
||||||
|
// unless you really really really really know what you are doing.
|
||||||
func Pop(queue Queue) interface{} {
|
func Pop(queue Queue) interface{} {
|
||||||
var result interface{}
|
var result interface{}
|
||||||
queue.Pop(func(obj interface{}) error {
|
queue.Pop(func(obj interface{}) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user