From 97cf9cb9c24f207c7c5251ad0a6f52c131769282 Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Wed, 8 Feb 2023 14:57:23 +0000 Subject: [PATCH 1/6] client-go/cache: fix missing delete event on replace This fixes a race condition when a "short lived" object is created and the create event is still present on the queue when a relist replaces the state. Previously that would lead in the object being leaked. The way this could happen is roughly; 1. new Object is added O, agent gets CREATED event for it 2. watch is terminated, and the agent runs a new list, L 3. CREATE event for O is still on the queue to be processed. 4. informer replaces the old data in store with L, and O is not in L - Since O is not in the store, and not in the list L, no DELETED event is queued 5. CREATE event for O is still on the queue to be processed. 6. CREATE event for O is processed 7. O is ; its present in the cache but not in k8s. With this patch, on step 4. above it would create a DELETED event ensuring that the object will be removed. Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal Kubernetes-commit: bd4ec0acec8844bddc7780d322f8fc215d045046 --- tools/cache/delta_fifo.go | 27 +++++++++ tools/cache/delta_fifo_test.go | 107 +++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 0c13a41f0..ee8133adf 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -647,6 +647,33 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } + // detect deletions for items in the queue that are not yet present in knownObjects + for k, oldItem := range f.items { + if keys.Has(k) { + continue + } + _, exists, _ := f.knownObjects.GetByKey(k) + if exists { + continue + } + // Delete pre-existing items not in the new list. + // This could happen if watch deletion event was missed while + // disconnected from apiserver. + var deletedObj interface{} + if n := oldItem.Newest(); n != nil { + deletedObj = n.Object + + // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object + if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { + deletedObj = d.Obj + } + } + queuedDeletions++ + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index f17240da5..0f813c2ca 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -121,6 +121,113 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } + +func TestDeltaFIFOWithKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *DeltaFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Replaced object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.emitDeltaTypeReplaced = true + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Deleted object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Sync, obj}, + {Sync, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + t.Run(tt.name, func(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{} + }), + }) + tt.operations(f) + actualDeltas := Pop(f) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) + } + }) + } +} + func TestDeltaFIFO_requeueOnPop(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) From ba3596940d8109f8a59c6ab8c17acda404da2be0 Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Fri, 10 Feb 2023 14:16:26 +0000 Subject: [PATCH 2/6] client-go/cache: fix missing delete event on replace without knownObjects This fixes an issue where a relist could result in a DELETED delta with an object wrapped in a DeletedFinalStateUnknown object; and then on the next relist, it would wrap that object inside another DeletedFinalStateUnknown, leaving the user with a "double" layer of DeletedFinalStateUnknown's. Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal Kubernetes-commit: 8509d70d3c33a038f0b5111a5e5696c833f6685b --- tools/cache/delta_fifo.go | 5 ++ tools/cache/delta_fifo_test.go | 102 +++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index ee8133adf..d525182d0 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -608,6 +608,11 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { var deletedObj interface{} if n := oldItem.Newest(); n != nil { deletedObj = n.Object + + // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object + if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { + deletedObj = d.Obj + } } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 0f813c2ca..7aa3d3d31 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -121,6 +121,108 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } +func TestDeltaFIFOWithoutKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *DeltaFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Replaced object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.emitDeltaTypeReplaced = true + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Deleted object should have only a single Delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Sync, obj}, + {Sync, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *DeltaFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + t.Run(tt.name, func(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + }) + tt.operations(f) + actualDeltas := Pop(f) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) + } + }) + } +} func TestDeltaFIFOWithKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { obj := mkFifoObj("foo", 2) From 30215cd5a17c9f7b4a303201c1b91c61d82f3bc0 Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Fri, 10 Feb 2023 14:30:10 +0000 Subject: [PATCH 3/6] client-go/cache: merge ReplaceMakesDeletionsForObjectsInQueue tests Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal Kubernetes-commit: d7878cdf2d6a7ec82b589aa95fd83770ba3edf2d --- tools/cache/delta_fifo_test.go | 131 ++++++--------------------------- 1 file changed, 23 insertions(+), 108 deletions(-) diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 7aa3d3d31..896058c1c 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -121,7 +121,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } -func TestDeltaFIFOWithoutKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { +func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { obj := mkFifoObj("foo", 2) objV2 := mkFifoObj("foo", 3) table := []struct { @@ -211,120 +211,35 @@ func TestDeltaFIFOWithoutKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t * } for _, tt := range table { tt := tt - t.Run(tt.name, func(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KeyFunction: testFifoObjectKeyFunc, - }) - tt.operations(f) - actualDeltas := Pop(f) - if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { - t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) - } - }) - } -} -func TestDeltaFIFOWithKnownObjects_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { - obj := mkFifoObj("foo", 2) - objV2 := mkFifoObj("foo", 3) - table := []struct { - name string - operations func(f *DeltaFIFO) - expectedDeltas Deltas - }{ - { - name: "Added object should be deleted on Replace", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, - }, - }, - { - name: "Replaced object should have only a single Delete", - operations: func(f *DeltaFIFO) { - f.emitDeltaTypeReplaced = true - f.Add(obj) - f.Replace([]interface{}{obj}, "0") - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Replaced, obj}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, - }, - }, - { - name: "Deleted object should have only a single Delete", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Delete(obj) - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Deleted, obj}, - }, - }, - { - name: "Synced objects should have a single delete", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Replace([]interface{}{obj}, "0") - f.Replace([]interface{}{obj}, "0") - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Sync, obj}, - {Sync, obj}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, - }, - }, - { - name: "Added objects should have a single delete on multiple Replaces", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Replace([]interface{}{}, "0") - f.Replace([]interface{}{}, "1") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, - }, - }, - { - name: "Added and deleted and added object should be deleted", - operations: func(f *DeltaFIFO) { - f.Add(obj) - f.Delete(obj) - f.Add(objV2) - f.Replace([]interface{}{}, "0") - }, - expectedDeltas: Deltas{ - {Added, obj}, - {Deleted, obj}, - {Added, objV2}, - {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, - }, - }, - } - for _, tt := range table { - tt := tt t.Run(tt.name, func(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + // Test with a DeltaFIFO with a backing KnownObjects + fWithKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: testFifoObjectKeyFunc, KnownObjects: literalListerGetter(func() []testFifoObject { return []testFifoObject{} }), }) - tt.operations(f) - actualDeltas := Pop(f) - if !reflect.DeepEqual(tt.expectedDeltas, actualDeltas) { - t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltas) + tt.operations(fWithKnownObjects) + actualDeltasWithKnownObjects := Pop(fWithKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithKnownObjects) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithKnownObjects) + } + if len(fWithKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items) + } + + // Test with a DeltaFIFO without a backing KnownObjects + fWithoutKnownObjects := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + }) + tt.operations(fWithoutKnownObjects) + actualDeltasWithoutKnownObjects := Pop(fWithoutKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualDeltasWithoutKnownObjects) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualDeltasWithoutKnownObjects) + } + if len(fWithoutKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithoutKnownObjects.items) } }) } From b667227efd48a97f6fab768b99fc0d39c3e69249 Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Mon, 13 Feb 2023 11:12:37 +0000 Subject: [PATCH 4/6] client-go/cache: rewrite Replace to check queue first This is useful to both reduce the code complexity, and to ensure clients get the "newest" version of an object known when its deleted. This is all best-effort, but for clients it makes more sense giving them the newest object they observed rather than an old one. This is especially useful when an object is recreated. eg. Object A with key K is in the KnownObjects store; - DELETE delta for A is queued with key K - CREATE delta for B is queued with key K - Replace without any object with key K in it. In this situation its better to create a DELETE delta with DeletedFinalStateUnknown with B (with this patch), than it is to give the client an DeletedFinalStateUnknown with A (without this patch). Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal Kubernetes-commit: 4f55d416f2e6b566eb397670b451d96712e638f1 --- tools/cache/delta_fifo.go | 88 +++++++++++----------------------- tools/cache/delta_fifo_test.go | 65 ++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 63 deletions(-) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index d525182d0..35a679615 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -595,72 +595,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } - if f.knownObjects == nil { - // Do deletion detection against our own list. - queuedDeletions := 0 - for k, oldItem := range f.items { - if keys.Has(k) { - continue - } - // Delete pre-existing items not in the new list. - // This could happen if watch deletion event was missed while - // disconnected from apiserver. - var deletedObj interface{} - if n := oldItem.Newest(); n != nil { - deletedObj = n.Object - - // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object - if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { - deletedObj = d.Obj - } - } - queuedDeletions++ - if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { - return err - } - } - - if !f.populated { - f.populated = true - // While there shouldn't be any queued deletions in the initial - // population of the queue, it's better to be on the safe side. - f.initialPopulationCount = keys.Len() + queuedDeletions - } - - return nil - } - - // Detect deletions not already in the queue. - knownKeys := f.knownObjects.ListKeys() + // Do deletion detection against objects in the queue queuedDeletions := 0 - for _, k := range knownKeys { - if keys.Has(k) { - continue - } - - deletedObj, exists, err := f.knownObjects.GetByKey(k) - if err != nil { - deletedObj = nil - klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) - } else if !exists { - deletedObj = nil - klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) - } - queuedDeletions++ - if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { - return err - } - } - - // detect deletions for items in the queue that are not yet present in knownObjects for k, oldItem := range f.items { if keys.Has(k) { continue } - _, exists, _ := f.knownObjects.GetByKey(k) - if exists { - continue - } // Delete pre-existing items not in the new list. // This could happen if watch deletion event was missed while // disconnected from apiserver. @@ -679,6 +619,32 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { } } + if f.knownObjects != nil { + // Detect deletions for objects not present in the queue, but present in KnownObjects + knownKeys := f.knownObjects.ListKeys() + for _, k := range knownKeys { + if keys.Has(k) { + continue + } + if len(f.items[k]) > 0 { + continue + } + + deletedObj, exists, err := f.knownObjects.GetByKey(k) + if err != nil { + deletedObj = nil + klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) + } else if !exists { + deletedObj = nil + klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) + } + queuedDeletions++ + if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { + return err + } + } + } + if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 896058c1c..8129705bb 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -121,7 +121,7 @@ func TestDeltaFIFO_replaceWithDeleteDeltaIn(t *testing.T) { } } -func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsInQueue(t *testing.T) { +func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { obj := mkFifoObj("foo", 2) objV2 := mkFifoObj("foo", 3) table := []struct { @@ -495,7 +495,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { expectedList = []Deltas{ {{Added, mkFifoObj("baz", 10)}, - {Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, {{Sync, mkFifoObj("foo", 5)}}, // Since "bar" didn't have a delete event and wasn't in the Replace list // it should get a tombstone key with the right Obj. @@ -509,6 +509,67 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } } + // Now try deleting and recreating the object in the queue, then delete it by a Replace call + f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + }) + f.Delete(mkFifoObj("bar", 6)) + f.Add(mkFifoObj("bar", 100)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + { + {Deleted, mkFifoObj("bar", 6)}, + {Added, mkFifoObj("bar", 100)}, + // Since "bar" has a newer object in the queue than in the state, + // it should get a tombstone key with the latest object from the queue + {Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}, + }, + {{Sync, mkFifoObj("foo", 5)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try syncing it first to ensure the delete use the latest version + f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + }) + f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0") + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + { + {Sync, mkFifoObj("bar", 100)}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}, + }, + { + {Sync, mkFifoObj("foo", 5)}, + {Sync, mkFifoObj("foo", 5)}, + }, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + // Now try starting without an explicit KeyListerGetter f = NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("baz", 10)) From 8190aa4d37862d3705b25e4fcd986176a85f385c Mon Sep 17 00:00:00 2001 From: Odin Ugedal Date: Mon, 13 Feb 2023 11:23:50 +0000 Subject: [PATCH 5/6] client-go/cache: update Replace comment to be more clear Since the behavior is now changed, and the old behavior leaked objects, this adds a new comment about how Replace works. Signed-off-by: Odin Ugedal Signed-off-by: Odin Ugedal Kubernetes-commit: cd7deae436c328085bcb50681b06e1cc275801db --- tools/cache/delta_fifo.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 35a679615..b05ae03f4 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -566,12 +566,11 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { // using the Sync or Replace DeltaType and then (2) it does some deletions. // In particular: for every pre-existing key K that is not the key of // an object in `list` there is the effect of -// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object -// of K. If `f.knownObjects == nil` then the pre-existing keys are -// those in `f.items` and the current object of K is the `.Newest()` -// of the Deltas associated with K. Otherwise the pre-existing keys -// are those listed by `f.knownObjects` and the current object of K is -// what `f.knownObjects.GetByKey(K)` returns. +// `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known +// object of K. The pre-existing keys are those in the union set of the keys in +// `f.items` and `f.knownObjects` (if not nil). The last known object for key K is +// the one present in the last delta in `f.items`. If there is no delta for K +// in `f.items`, it is the object in `f.knownObjects` func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { f.lock.Lock() defer f.lock.Unlock() From fffc68d58e65d023e9783824b1c0fe1c2468c11e Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 14 Mar 2023 23:05:20 +0000 Subject: [PATCH 6/6] Change where transformers are called. odinuge: sorted out some function signature changes during cherry-picking that caused conflicts. (cherry picked from commit e76dff38cf74c3c8ad9ed4d3bc6e3641d9b64565) Signed-off-by: Odin Ugedal Kubernetes-commit: a8d2bc0ff7537bcb17e0b85333615dafd7c1e9a9 --- tools/cache/controller.go | 22 +------- tools/cache/controller_test.go | 4 +- tools/cache/delta_fifo.go | 44 ++++++++++++++++ tools/cache/delta_fifo_test.go | 82 +++++++++++++++++++++++++++++ tools/cache/shared_informer.go | 8 ++- tools/cache/shared_informer_test.go | 5 +- 6 files changed, 135 insertions(+), 30 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index 0762da3be..96005ff58 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -353,17 +353,6 @@ func NewIndexerInformer( return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil) } -// TransformFunc allows for transforming an object before it will be processed -// and put into the controller cache and before the corresponding handlers will -// be called on it. -// TransformFunc (similarly to ResourceEventHandler functions) should be able -// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown -// -// The most common usage pattern is to clean-up some parts of the object to -// reduce component memory usage if a given component doesn't care about them. -// given controller doesn't care for them -type TransformFunc func(interface{}) (interface{}, error) - // NewTransformingInformer returns a Store and a controller for populating // the store while also providing event notifications. You should only used // the returned Store for Get/List operations; Add/Modify/Deletes will cause @@ -411,19 +400,11 @@ func processDeltas( // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, - transformer TransformFunc, deltas Deltas, ) error { // from oldest to newest for _, d := range deltas { obj := d.Object - if transformer != nil { - var err error - obj, err = transformer(obj) - if err != nil { - return err - } - } switch d.Type { case Sync, Replaced, Added, Updated: @@ -475,6 +456,7 @@ func newInformer( fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: clientState, EmitDeltaTypeReplaced: true, + Transformer: transformer, }) cfg := &Config{ @@ -486,7 +468,7 @@ func newInformer( Process: func(obj interface{}) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(h, clientState, transformer, deltas) + return processDeltas(h, clientState, deltas) } return errors.New("object given as Process argument is not Deltas") }, diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index cf42478e0..ca4ba63ce 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -23,7 +23,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -32,7 +32,7 @@ import ( "k8s.io/apimachinery/pkg/watch" fcache "k8s.io/client-go/tools/cache/testing" - "github.com/google/gofuzz" + fuzz "github.com/google/gofuzz" ) func Example() { diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index b05ae03f4..84f3ab9ca 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -51,6 +51,10 @@ type DeltaFIFOOptions struct { // When true, `Replaced` events will be sent for items passed to a Replace() call. // When false, `Sync` events will be sent instead. EmitDeltaTypeReplaced bool + + // If set, will be called for objects before enqueueing them. Please + // see the comment on TransformFunc for details. + Transformer TransformFunc } // DeltaFIFO is like FIFO, but differs in two ways. One is that the @@ -129,8 +133,32 @@ type DeltaFIFO struct { // emitDeltaTypeReplaced is whether to emit the Replaced or Sync // DeltaType when Replace() is called (to preserve backwards compat). emitDeltaTypeReplaced bool + + // Called with every object if non-nil. + transformer TransformFunc } +// TransformFunc allows for transforming an object before it will be processed. +// TransformFunc (similarly to ResourceEventHandler functions) should be able +// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown. +// +// New in v1.27: In such cases, the contained object will already have gone +// through the transform object separately (when it was added / updated prior +// to the delete), so the TransformFunc can likely safely ignore such objects +// (i.e., just return the input object). +// +// The most common usage pattern is to clean-up some parts of the object to +// reduce component memory usage if a given component doesn't care about them. +// +// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc +// sees the object before any other actor, and it is now safe to mutate the +// object in place instead of making a copy. +// +// Note that TransformFunc is called while inserting objects into the +// notification queue and is therefore extremely performance sensitive; please +// do not do anything that will take a long time. +type TransformFunc func(interface{}) (interface{}, error) + // DeltaType is the type of a change (addition, deletion, etc) type DeltaType string @@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, + transformer: opts.Transformer, } f.cond.L = &f.lock return f @@ -411,6 +440,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err if err != nil { return KeyError{obj, err} } + + // Every object comes through this code path once, so this is a good + // place to call the transform func. If obj is a + // DeletedFinalStateUnknown tombstone, then the containted inner object + // will already have gone through the transformer, but we document that + // this can happen. In cases involving Replace(), such an object can + // come through multiple times. + if f.transformer != nil { + var err error + obj, err = f.transformer(obj) + if err != nil { + return err + } + } + oldDeltas := f.items[id] newDeltas := append(oldDeltas, Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 8129705bb..a7054e69c 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -327,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { } } +type rvAndXfrm struct { + rv int + xfrm int +} + +func TestDeltaFIFO_transformer(t *testing.T) { + mk := func(name string, rv int) testFifoObject { + return mkFifoObj(name, &rvAndXfrm{rv, 0}) + } + xfrm := TransformFunc(func(obj interface{}) (interface{}, error) { + switch v := obj.(type) { + case testFifoObject: + v.val.(*rvAndXfrm).xfrm++ + case DeletedFinalStateUnknown: + if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 { + return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj) + } + default: + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + return obj, nil + }) + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + Transformer: xfrm, + }) + must(f.Add(mk("foo", 10))) + must(f.Add(mk("bar", 11))) + must(f.Update(mk("foo", 12))) + must(f.Delete(mk("foo", 15))) + must(f.Replace([]interface{}{}, "")) + must(f.Add(mk("bar", 16))) + must(f.Replace([]interface{}{}, "")) + + // Should be empty + if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + + for i := 0; i < 2; i++ { + obj, err := f.Pop(func(o interface{}) error { return nil }) + if err != nil { + t.Fatalf("got nothing on try %v?", i) + } + obj = obj.(Deltas).Newest().Object + switch v := obj.(type) { + case testFifoObject: + if v.name != "foo" { + t.Errorf("expected regular deletion of foo, got %q", v.name) + } + rx := v.val.(*rvAndXfrm) + if rx.rv != 15 { + t.Errorf("expected last message, got %#v", obj) + } + if rx.xfrm != 1 { + t.Errorf("obj %v transformed wrong number of times.", obj) + } + case DeletedFinalStateUnknown: + tf := v.Obj.(testFifoObject) + rx := tf.val.(*rvAndXfrm) + if tf.name != "bar" { + t.Errorf("expected tombstone deletion of bar, got %q", tf.name) + } + if rx.rv != 16 { + t.Errorf("expected last message, got %#v", obj) + } + if rx.xfrm != 1 { + t.Errorf("tombstoned obj %v transformed wrong number of times.", obj) + } + default: + t.Errorf("unknown item %#v", obj) + } + } +} + func TestDeltaFIFO_enqueueingNoLister(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("foo", 10)) diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index f5c7316a1..4979642ce 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -198,10 +198,7 @@ type SharedInformer interface { // // Must be set before starting the informer. // - // Note: Since the object given to the handler may be already shared with - // other goroutines, it is advisable to copy the object being - // transform before mutating it at all and returning the copy to prevent - // data races. + // Please see the comment on TransformFunc for more details. SetTransform(handler TransformFunc) error // IsStopped reports whether the informer has already been stopped. @@ -422,6 +419,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, + Transformer: s.transform, }) cfg := &Config{ @@ -585,7 +583,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, s.transform, deltas) + return processDeltas(s, s.indexer, deltas) } return errors.New("object given as Process argument is not Deltas") } diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 2676e8f54..515fc22f5 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -395,9 +395,8 @@ func TestSharedInformerTransformer(t *testing.T) { name := pod.GetName() if upper := strings.ToUpper(name); upper != name { - copied := pod.DeepCopyObject().(*v1.Pod) - copied.SetName(upper) - return copied, nil + pod.SetName(upper) + return pod, nil } } return obj, nil