From 25d77218acdac2f793071add9ea878b08c7d328b Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Wed, 8 Feb 2023 14:57:23 +0000 Subject: [PATCH 1/5] client-go/cache: fix missing delete event on replace This fixes a race condition when a "short lived" object is created and the create event is still present on the queue when a relist replaces the state. Previously that would lead in the object being leaked. The way this could happen is roughly; 1. new Object is added O, agent gets CREATED event for it 2. watch is terminated, and the agent runs a new list, L 3. CREATE event for O is still on the queue to be processed. 4. informer replaces the old data in store with L, and O is not in L - Since O is not in the store, and not in the list L, no DELETED event is queued 5. CREATE event for O is still on the queue to be processed. 6. CREATE event for O is processed 7. O is ; its present in the cache but not in k8s. With this patch, on step 4. above it would create a DELETED event ensuring that the object will be removed. Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal --- .../client-go/tools/cache/delta_fifo.go | 27 +++++ .../client-go/tools/cache/delta_fifo_test.go | 107 ++++++++++++++++++ 2 files changed, 134 insertions(+) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index c4f2de7b251..7dd79f9eb28 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -652,6 +652,33 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } + // detect deletions for items in the queue that are not yet present in knownObjects + for k, oldItem := range f.items { + if keys.Has(k) { + continue + } + _, exists, _ := f.knownObjects.GetByKey(k) + if exists { + continue + } + // Delete pre-existing items not in the new list. + // This could happen if watch deletion event was missed while + // disconnected from apiserver. + var deletedObj interface{} + if n := oldItem.Newest(); n != nil { + deletedObj = n.Object + + // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object + if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { + deletedObj = d.Obj + } + } + queuedDeletions++ + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index 902fcaedb1d..b8c36a46f7e 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -121,6 +121,113 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } + +func TestDeltaFIFOWithKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *DeltaFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Replaced object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.emitDeltaTypeReplaced = true + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Deleted object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Sync, obj}, + {Sync, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + t.Run(tt.name, func(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{} + }), + }) + tt.operations(f) + actualDeltas := Pop(f) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) + } + }) + } +} + func TestDeltaFIFO_requeueOnPop(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) From 0bf0546d9f75d92c801e81c9f7adf040bba64102 Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Fri, 10 Feb 2023 14:16:26 +0000 Subject: [PATCH 2/5] client-go/cache: fix missing delete event on replace without knownObjects This fixes an issue where a relist could result in a DELETED delta with an object wrapped in a DeletedFinalStateUnknown object; and then on the next relist, it would wrap that object inside another DeletedFinalStateUnknown, leaving the user with a "double" layer of DeletedFinalStateUnknown's. Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal --- .../client-go/tools/cache/delta_fifo.go | 5 + .../client-go/tools/cache/delta_fifo_test.go | 102 ++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 7dd79f9eb28..bb86198952d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -613,6 +613,11 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { var deletedObj interface{} if n := oldItem.Newest(); n != nil { deletedObj = n.Object + + // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object + if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { + deletedObj = d.Obj + } } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index b8c36a46f7e..341824746e0 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -121,6 +121,108 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } +func TestDeltaFIFOWithoutKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *DeltaFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Replaced object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.emitDeltaTypeReplaced = true + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Deleted object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Sync, obj}, + {Sync, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + t.Run(tt.name, func(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + }) + tt.operations(f) + actualDeltas := Pop(f) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) + } + }) + } +} func TestDeltaFIFOWithKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { obj := mkFifoObj("foo", 2) From cd3e98b65c1339a8adc157175630de099a057d3f Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Fri, 10 Feb 2023 14:30:10 +0000 Subject: [PATCH 3/5] client-go/cache: merge ReplaceMakesDeletionsForObjectsInQueue tests Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal --- .../client-go/tools/cache/delta_fifo_test.go | 131 +++--------------- 1 file changed, 23 insertions(+), 108 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index 341824746e0..ee69192a9db 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -121,7 +121,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } -func TestDeltaFIFOWithoutKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { +func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { obj := mkFifoObj("foo", 2) objV2 := mkFifoObj("foo", 3) table := []struct { @@ -211,120 +211,35 @@ func TestDeltaFIFOWithoutKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t * } for _, tt := range table { tt := tt - t.Run(tt.name, func(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KeyFunction: testFifoObjectKeyFunc, - }) - tt.operations(f) - actualDeltas := Pop(f) - if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { - t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) - } - }) - } -} -func TestDeltaFIFOWithKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { - obj := mkFifoObj("foo", 2) - objV2 := mkFifoObj("foo", 3) - table := []struct { - name string - operations func(f *DeltaFIFO) - expectedDeltas Deltas - }{ - { - name: "Added object should be deleted on Replace", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, - }, - }, - { - name: "Replaced object should have only a single Delete", - operations: func(f *DeltaFIFO) { - f.emitDeltaTypeReplaced = true - f.Add(obj) - f.Replace([]interface{}{obj}, "0") - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Replaced, obj}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, - }, - }, - { - name: "Deleted object should have only a single Delete", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Delete(obj) - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Deleted, obj}, - }, - }, - { - name: "Synced objects should have a single delete", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Replace([]interface{}{obj}, "0") - f.Replace([]interface{}{obj}, "0") - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Sync, obj}, - {Sync, obj}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, - }, - }, - { - name: "Added objects should have a single delete on multiple Replaces", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Replace([]interface{}{}, "0") - f.Replace([]interface{}{}, "1") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, - }, - }, - { - name: "Added and deleted and added object should be deleted", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Delete(obj) - f.Add(objV2) - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Deleted, obj}, - {Added, objV2}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, - }, - }, - } - for _, tt := range table { - tt := tt t.Run(tt.name, func(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + // Test with a DeltaFIFO with a backing KnownObjects + fWithKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, KnownObjects: literalListerGetter(func() []testFifoObject { return []testFifoObject{} }), }) - tt.operations(f) - actualDeltas := Pop(f) - if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { - t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) + tt.operations(fWithKnownObjects) + actualDeltasWithKnownObjects := Pop(fWithKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects) + } + if len(fWithKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items) + } + + // Test with a DeltaFIFO without a backing KnownObjects + fWithoutKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + }) + tt.operations(fWithoutKnownObjects) + actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects) + } + if len(fWithoutKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithoutKnownObjects.items) } }) } From 7bcc3e00fc28b2548886d04639a2e352ab37fb55 Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Mon, 13 Feb 2023 11:12:37 +0000 Subject: [PATCH 4/5] client-go/cache: rewrite Replace to check queue first This is useful to both reduce the code complexity, and to ensure clients get the "newest" version of an object known when its deleted. This is all best-effort, but for clients it makes more sense giving them the newest object they observed rather than an old one. This is especially useful when an object is recreated. eg. Object A with key K is in the KnownObjects store; - DELETE delta for A is queued with key K - CREATE delta for B is queued with key K - Replace without any object with key K in it. In this situation its better to create a DELETE delta with DeletedFinalStateUnknown with B (with this patch), than it is to give the client an DeletedFinalStateUnknown with A (without this patch). Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal --- .../client-go/tools/cache/delta_fifo.go | 88 ++++++------------- .../client-go/tools/cache/delta_fifo_test.go | 65 +++++++++++++- 2 files changed, 90 insertions(+), 63 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index bb86198952d..b83e9d2daf7 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -600,72 +600,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } - if f.knownObjects == nil { - // Do deletion detection against our own list. - queuedDeletions := 0 - for k, oldItem := range f.items { - if keys.Has(k) { - continue - } - // Delete pre-existing items not in the new list. - // This could happen if watch deletion event was missed while - // disconnected from apiserver. - var deletedObj interface{} - if n := oldItem.Newest(); n != nil { - deletedObj = n.Object - - // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object - if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { - deletedObj = d.Obj - } - } - queuedDeletions++ - if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { - return err - } - } - - if !f.populated { - f.populated = true - // While there shouldn't be any queued deletions in the initial - // population of the queue, it's better to be on the safe side. - f.initialPopulationCount = keys.Len() + queuedDeletions - } - - return nil - } - - // Detect deletions not already in the queue. - knownKeys := f.knownObjects.ListKeys() + // Do deletion detection against objects in the queue queuedDeletions := 0 - for _, k := range knownKeys { - if keys.Has(k) { - continue - } - - deletedObj, exists, err := f.knownObjects.GetByKey(k) - if err != nil { - deletedObj = nil - klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) - } else if !exists { - deletedObj = nil - klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) - } - queuedDeletions++ - if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { - return err - } - } - - // detect deletions for items in the queue that are not yet present in knownObjects for k, oldItem := range f.items { if keys.Has(k) { continue } - _, exists, _ := f.knownObjects.GetByKey(k) - if exists { - continue - } // Delete pre-existing items not in the new list. // This could happen if watch deletion event was missed while // disconnected from apiserver. @@ -684,6 +624,32 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } + if f.knownObjects != nil { + // Detect deletions for objects not present in the queue, but present in KnownObjects + knownKeys := f.knownObjects.ListKeys() + for _, k := range knownKeys { + if keys.Has(k) { + continue + } + if len(f.items[k]) > 0 { + continue + } + + deletedObj, exists, err := f.knownObjects.GetByKey(k) + if err != nil { + deletedObj = nil + klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) + } else if !exists { + deletedObj = nil + klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) + } + queuedDeletions++ + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + } + if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index ee69192a9db..9d124165711 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -121,7 +121,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } -func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { +func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { obj := mkFifoObj("foo", 2) objV2 := mkFifoObj("foo", 3) table := []struct { @@ -495,7 +495,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { expectedList = []Deltas{ {{Added, mkFifoObj("baz", 10)}, - {Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, {{Sync, mkFifoObj("foo", 5)}}, // Since "bar" didn't have a delete event and wasn't in the Replace list // it should get a tombstone key with the right Obj. @@ -509,6 +509,67 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } } + // Now try deleting and recreating the object in the queue, then delete it by a Replace call + f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + }) + f.Delete(mkFifoObj("bar", 6)) + f.Add(mkFifoObj("bar", 100)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + { + {Deleted, mkFifoObj("bar", 6)}, + {Added, mkFifoObj("bar", 100)}, + // Since "bar" has a newer object in the queue than in the state, + // it should get a tombstone key with the latest object from the queue + {Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}, + }, + {{Sync, mkFifoObj("foo", 5)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try syncing it first to ensure the delete use the latest version + f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + }) + f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0") + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + { + {Sync, mkFifoObj("bar", 100)}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}, + }, + { + {Sync, mkFifoObj("foo", 5)}, + {Sync, mkFifoObj("foo", 5)}, + }, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + // Now try starting without an explicit KeyListerGetter f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("baz", 10)) From 27f4bcae5c52a3bb88141f940ec23d907a15cde5 Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Mon, 13 Feb 2023 11:23:50 +0000 Subject: [PATCH 5/5] client-go/cache: update Replace comment to be more clear Since the behavior is now changed, and the old behavior leaked objects, this adds a new comment about how Replace works. Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal --- .../src/k8s.io/client-go/tools/cache/delta_fifo.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index b83e9d2daf7..1eb092f09aa 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -571,12 +571,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { // using the Sync or Replace DeltaType and then (2) it does some deletions. // In particular: for every pre-existing key K that is not the key of // an object in `list` there is the effect of -// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object -// of K. If `f.knownObjects == nil` then the pre-existing keys are -// those in `f.items` and the current object of K is the `.Newest()` -// of the Deltas associated with K. Otherwise the pre-existing keys -// are those listed by `f.knownObjects` and the current object of K is -// what `f.knownObjects.GetByKey(K)` returns. +// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known +// object of K. The pre-existing keys are those in the union set of the keys in +// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is +// the one present in the last delta in `f.items`. If there is no delta for K +// in `f.items`, it is the object in `f.knownObjects` func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { f.lock.Lock() defer f.lock.Unlock()