From 2ded6b6eb8bbb7d5db578fe9295f19af48ef5ffa Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Wed, 8 Feb 2023 14:57:23 +0000 Subject: [PATCH] client-go/cache: fix missing delete event on replace This fixes a race condition when a "short lived" object is created and the create event is still present on the queue when a relist replaces the state. Previously that would lead in the object being leaked. The way this could happen is roughly; 1. new Object is added O, agent gets CREATED event for it 2. watch is terminated, and the agent runs a new list, L 3. CREATE event for O is still on the queue to be processed. 4. informer replaces the old data in store with L, and O is not in L - Since O is not in the store, and not in the list L, no DELETED event is queued 5. CREATE event for O is still on the queue to be processed. 6. CREATE event for O is processed 7. O is ; its present in the cache but not in k8s. With this patch, on step 4. above it would create a DELETED event ensuring that the object will be removed. Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal Kubernetes-commit: 25d77218acdac2f793071add9ea878b08c7d328b --- tools/cache/delta_fifo.go | 27 +++++++++ tools/cache/delta_fifo_test.go | 107 +++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index c4f2de7b..7dd79f9e 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -652,6 +652,33 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } + // detect deletions for items in the queue that are not yet present in knownObjects + for k, oldItem := range f.items { + if keys.Has(k) { + continue + } + _, exists, _ := f.knownObjects.GetByKey(k) + if exists { + continue + } + // Delete pre-existing items not in the new list. + // This could happen if watch deletion event was missed while + // disconnected from apiserver. + var deletedObj interface{} + if n := oldItem.Newest(); n != nil { + deletedObj = n.Object + + // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object + if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { + deletedObj = d.Obj + } + } + queuedDeletions++ + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 902fcaed..b8c36a46 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -121,6 +121,113 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } + +func TestDeltaFIFOWithKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *DeltaFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Replaced object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.emitDeltaTypeReplaced = true + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Deleted object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Sync, obj}, + {Sync, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + t.Run(tt.name, func(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{} + }), + }) + tt.operations(f) + actualDeltas := Pop(f) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) + } + }) + } +} + func TestDeltaFIFO_requeueOnPop(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})