mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
Merge pull request #115620 from odinuge/ou-relist-missing-delete
client-go/cache: fix missing delete event on replace
This commit is contained in:
commit
814faade7f
@ -571,12 +571,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
|||||||
// using the Sync or Replace DeltaType and then (2) it does some deletions.
|
// using the Sync or Replace DeltaType and then (2) it does some deletions.
|
||||||
// In particular: for every pre-existing key K that is not the key of
|
// In particular: for every pre-existing key K that is not the key of
|
||||||
// an object in `list` there is the effect of
|
// an object in `list` there is the effect of
|
||||||
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
|
// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
|
||||||
// of K. If `f.knownObjects == nil` then the pre-existing keys are
|
// object of K. The pre-existing keys are those in the union set of the keys in
|
||||||
// those in `f.items` and the current object of K is the `.Newest()`
|
// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
|
||||||
// of the Deltas associated with K. Otherwise the pre-existing keys
|
// the one present in the last delta in `f.items`. If there is no delta for K
|
||||||
// are those listed by `f.knownObjects` and the current object of K is
|
// in `f.items`, it is the object in `f.knownObjects`
|
||||||
// what `f.knownObjects.GetByKey(K)` returns.
|
|
||||||
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
|
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
@ -600,56 +599,54 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if f.knownObjects == nil {
|
// Do deletion detection against objects in the queue
|
||||||
// Do deletion detection against our own list.
|
queuedDeletions := 0
|
||||||
queuedDeletions := 0
|
for k, oldItem := range f.items {
|
||||||
for k, oldItem := range f.items {
|
if keys.Has(k) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Delete pre-existing items not in the new list.
|
||||||
|
// This could happen if watch deletion event was missed while
|
||||||
|
// disconnected from apiserver.
|
||||||
|
var deletedObj interface{}
|
||||||
|
if n := oldItem.Newest(); n != nil {
|
||||||
|
deletedObj = n.Object
|
||||||
|
|
||||||
|
// if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
|
||||||
|
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
|
||||||
|
deletedObj = d.Obj
|
||||||
|
}
|
||||||
|
}
|
||||||
|
queuedDeletions++
|
||||||
|
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.knownObjects != nil {
|
||||||
|
// Detect deletions for objects not present in the queue, but present in KnownObjects
|
||||||
|
knownKeys := f.knownObjects.ListKeys()
|
||||||
|
for _, k := range knownKeys {
|
||||||
if keys.Has(k) {
|
if keys.Has(k) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Delete pre-existing items not in the new list.
|
if len(f.items[k]) > 0 {
|
||||||
// This could happen if watch deletion event was missed while
|
continue
|
||||||
// disconnected from apiserver.
|
}
|
||||||
var deletedObj interface{}
|
|
||||||
if n := oldItem.Newest(); n != nil {
|
deletedObj, exists, err := f.knownObjects.GetByKey(k)
|
||||||
deletedObj = n.Object
|
if err != nil {
|
||||||
|
deletedObj = nil
|
||||||
|
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
|
||||||
|
} else if !exists {
|
||||||
|
deletedObj = nil
|
||||||
|
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
|
||||||
}
|
}
|
||||||
queuedDeletions++
|
queuedDeletions++
|
||||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !f.populated {
|
|
||||||
f.populated = true
|
|
||||||
// While there shouldn't be any queued deletions in the initial
|
|
||||||
// population of the queue, it's better to be on the safe side.
|
|
||||||
f.initialPopulationCount = keys.Len() + queuedDeletions
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Detect deletions not already in the queue.
|
|
||||||
knownKeys := f.knownObjects.ListKeys()
|
|
||||||
queuedDeletions := 0
|
|
||||||
for _, k := range knownKeys {
|
|
||||||
if keys.Has(k) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
deletedObj, exists, err := f.knownObjects.GetByKey(k)
|
|
||||||
if err != nil {
|
|
||||||
deletedObj = nil
|
|
||||||
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
|
|
||||||
} else if !exists {
|
|
||||||
deletedObj = nil
|
|
||||||
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
|
|
||||||
}
|
|
||||||
queuedDeletions++
|
|
||||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !f.populated {
|
if !f.populated {
|
||||||
|
@ -121,6 +121,130 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) {
|
||||||
|
obj := mkFifoObj("foo", 2)
|
||||||
|
objV2 := mkFifoObj("foo", 3)
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
operations func(f *DeltaFIFO)
|
||||||
|
expectedDeltas Deltas
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Added object should be deleted on Replace",
|
||||||
|
operations: func(f *DeltaFIFO) {
|
||||||
|
f.Add(obj)
|
||||||
|
f.Replace([]interface{}{}, "0")
|
||||||
|
},
|
||||||
|
expectedDeltas: Deltas{
|
||||||
|
{Added, obj},
|
||||||
|
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Replaced object should have only a single Delete",
|
||||||
|
operations: func(f *DeltaFIFO) {
|
||||||
|
f.emitDeltaTypeReplaced = true
|
||||||
|
f.Add(obj)
|
||||||
|
f.Replace([]interface{}{obj}, "0")
|
||||||
|
f.Replace([]interface{}{}, "0")
|
||||||
|
},
|
||||||
|
expectedDeltas: Deltas{
|
||||||
|
{Added, obj},
|
||||||
|
{Replaced, obj},
|
||||||
|
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Deleted object should have only a single Delete",
|
||||||
|
operations: func(f *DeltaFIFO) {
|
||||||
|
f.Add(obj)
|
||||||
|
f.Delete(obj)
|
||||||
|
f.Replace([]interface{}{}, "0")
|
||||||
|
},
|
||||||
|
expectedDeltas: Deltas{
|
||||||
|
{Added, obj},
|
||||||
|
{Deleted, obj},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Synced objects should have a single delete",
|
||||||
|
operations: func(f *DeltaFIFO) {
|
||||||
|
f.Add(obj)
|
||||||
|
f.Replace([]interface{}{obj}, "0")
|
||||||
|
f.Replace([]interface{}{obj}, "0")
|
||||||
|
f.Replace([]interface{}{}, "0")
|
||||||
|
},
|
||||||
|
expectedDeltas: Deltas{
|
||||||
|
{Added, obj},
|
||||||
|
{Sync, obj},
|
||||||
|
{Sync, obj},
|
||||||
|
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Added objects should have a single delete on multiple Replaces",
|
||||||
|
operations: func(f *DeltaFIFO) {
|
||||||
|
f.Add(obj)
|
||||||
|
f.Replace([]interface{}{}, "0")
|
||||||
|
f.Replace([]interface{}{}, "1")
|
||||||
|
},
|
||||||
|
expectedDeltas: Deltas{
|
||||||
|
{Added, obj},
|
||||||
|
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Added and deleted and added object should be deleted",
|
||||||
|
operations: func(f *DeltaFIFO) {
|
||||||
|
f.Add(obj)
|
||||||
|
f.Delete(obj)
|
||||||
|
f.Add(objV2)
|
||||||
|
f.Replace([]interface{}{}, "0")
|
||||||
|
},
|
||||||
|
expectedDeltas: Deltas{
|
||||||
|
{Added, obj},
|
||||||
|
{Deleted, obj},
|
||||||
|
{Added, objV2},
|
||||||
|
{Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range table {
|
||||||
|
tt := tt
|
||||||
|
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
// Test with a DeltaFIFO with a backing KnownObjects
|
||||||
|
fWithKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||||
|
KeyFunction: testFifoObjectKeyFunc,
|
||||||
|
KnownObjects: literalListerGetter(func() []testFifoObject {
|
||||||
|
return []testFifoObject{}
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
tt.operations(fWithKnownObjects)
|
||||||
|
actualDeltasWithKnownObjects := Pop(fWithKnownObjects)
|
||||||
|
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) {
|
||||||
|
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects)
|
||||||
|
}
|
||||||
|
if len(fWithKnownObjects.items) != 0 {
|
||||||
|
t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with a DeltaFIFO without a backing KnownObjects
|
||||||
|
fWithoutKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||||
|
KeyFunction: testFifoObjectKeyFunc,
|
||||||
|
})
|
||||||
|
tt.operations(fWithoutKnownObjects)
|
||||||
|
actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects)
|
||||||
|
if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) {
|
||||||
|
t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects)
|
||||||
|
}
|
||||||
|
if len(fWithoutKnownObjects.items) != 0 {
|
||||||
|
t.Errorf("expected no extra deltas (empty map), got %#v", fWithoutKnownObjects.items)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
|
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
|
||||||
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
||||||
|
|
||||||
@ -371,7 +495,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
|||||||
|
|
||||||
expectedList = []Deltas{
|
expectedList = []Deltas{
|
||||||
{{Added, mkFifoObj("baz", 10)},
|
{{Added, mkFifoObj("baz", 10)},
|
||||||
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
|
{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}},
|
||||||
{{Sync, mkFifoObj("foo", 5)}},
|
{{Sync, mkFifoObj("foo", 5)}},
|
||||||
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
||||||
// it should get a tombstone key with the right Obj.
|
// it should get a tombstone key with the right Obj.
|
||||||
@ -385,6 +509,67 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Now try deleting and recreating the object in the queue, then delete it by a Replace call
|
||||||
|
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||||
|
KeyFunction: testFifoObjectKeyFunc,
|
||||||
|
KnownObjects: literalListerGetter(func() []testFifoObject {
|
||||||
|
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
f.Delete(mkFifoObj("bar", 6))
|
||||||
|
f.Add(mkFifoObj("bar", 100))
|
||||||
|
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
|
||||||
|
|
||||||
|
expectedList = []Deltas{
|
||||||
|
{
|
||||||
|
{Deleted, mkFifoObj("bar", 6)},
|
||||||
|
{Added, mkFifoObj("bar", 100)},
|
||||||
|
// Since "bar" has a newer object in the queue than in the state,
|
||||||
|
// it should get a tombstone key with the latest object from the queue
|
||||||
|
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
|
||||||
|
},
|
||||||
|
{{Sync, mkFifoObj("foo", 5)}},
|
||||||
|
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, expected := range expectedList {
|
||||||
|
cur := Pop(f).(Deltas)
|
||||||
|
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected %#v, got %#v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now try syncing it first to ensure the delete use the latest version
|
||||||
|
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||||
|
KeyFunction: testFifoObjectKeyFunc,
|
||||||
|
KnownObjects: literalListerGetter(func() []testFifoObject {
|
||||||
|
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0")
|
||||||
|
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
|
||||||
|
|
||||||
|
expectedList = []Deltas{
|
||||||
|
{
|
||||||
|
{Sync, mkFifoObj("bar", 100)},
|
||||||
|
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
||||||
|
// it should get a tombstone key with the right Obj.
|
||||||
|
{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{Sync, mkFifoObj("foo", 5)},
|
||||||
|
{Sync, mkFifoObj("foo", 5)},
|
||||||
|
},
|
||||||
|
{{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, expected := range expectedList {
|
||||||
|
cur := Pop(f).(Deltas)
|
||||||
|
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
|
||||||
|
t.Errorf("Expected %#v, got %#v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Now try starting without an explicit KeyListerGetter
|
// Now try starting without an explicit KeyListerGetter
|
||||||
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
|
||||||
f.Add(mkFifoObj("baz", 10))
|
f.Add(mkFifoObj("baz", 10))
|
||||||
|
Loading…
Reference in New Issue
Block a user