diff --git a/tools/cache/the_real_fifo.go b/tools/cache/the_real_fifo.go new file mode 100644 index 00000000..9be14ff3 --- /dev/null +++ b/tools/cache/the_real_fifo.go @@ -0,0 +1,407 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + utiltrace "k8s.io/utils/trace" + "sync" + "time" +) + +// RealFIFO is a Queue in which every notification from the Reflector is passed +// in order to the Queue via Pop. +// This means that it +// 1. delivers notifications for items that have been deleted +// 2. delivers multiple notifications per item instead of simply the most recent value +type RealFIFO struct { + lock sync.RWMutex + cond sync.Cond + + items []Delta + + // populated is true if the first batch of items inserted by Replace() has been populated + // or Delete/Add/Update was called first. + populated bool + // initialPopulationCount is the number of items inserted by the first call of Replace() + initialPopulationCount int + + // keyFunc is used to make the key used for queued item insertion and retrieval, and + // should be deterministic. + keyFunc KeyFunc + + // knownObjects list keys that are "known" --- affecting Delete(), + // Replace(), and Resync() + knownObjects KeyListerGetter + + // Indication the queue is closed. + // Used to indicate a queue is closed so a control loop can exit when a queue is empty. + // Currently, not used to gate any of CRUD operations. + closed bool + + // Called with every object if non-nil. + transformer TransformFunc +} + +var ( + _ = Queue(&RealFIFO{}) // RealFIFO is a Queue +) + +// Close the queue. +func (f *RealFIFO) Close() { + f.lock.Lock() + defer f.lock.Unlock() + f.closed = true + f.cond.Broadcast() +} + +// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or +// DeletedFinalStateUnknown objects. +func (f *RealFIFO) keyOf(obj interface{}) (string, error) { + if d, ok := obj.(Deltas); ok { + if len(d) == 0 { + return "", KeyError{obj, ErrZeroLengthDeltasObject} + } + obj = d.Newest().Object + } + if d, ok := obj.(Delta); ok { + obj = d.Object + } + if d, ok := obj.(DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return f.keyFunc(obj) +} + +// HasSynced returns true if an Add/Update/Delete are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *RealFIFO) HasSynced() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.hasSynced_locked() +} + +// ignoring lint to reduce delta to the original for review. It's ok adjust later. +// +//lint:file-ignore ST1003: should not use underscores in Go names +func (f *RealFIFO) hasSynced_locked() bool { + return f.populated && f.initialPopulationCount == 0 +} + +// addToItems_locked appends to the delta list. +func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bool, obj interface{}) error { + // we must be able to read the keys in order to determine whether the knownObjcts and the items + // in this FIFO overlap + _, err := f.keyOf(obj) + if err != nil { + return KeyError{obj, err} + } + + // Every object comes through this code path once, so this is a good + // place to call the transform func. + // + // If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync, + // then the object have already gone through the transformer. + // + // If the objects already present in the cache are passed to Replace(), + // the transformer must be idempotent to avoid re-mutating them, + // or coordinate with all readers from the cache to avoid data races. + // Default informers do not pass existing objects to Replace. + if f.transformer != nil { + _, isTombstone := obj.(DeletedFinalStateUnknown) + if !isTombstone && !skipTransform { + var err error + obj, err = f.transformer(obj) + if err != nil { + return err + } + } + } + + f.items = append(f.items, Delta{ + Type: deltaActionType, + Object: obj, + }) + f.cond.Broadcast() + + return nil +} + +// Add inserts an item, and puts it in the queue. The item is only enqueued +// if it doesn't already exist in the set. +func (f *RealFIFO) Add(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Added, false, obj) + + return retErr +} + +// Update is the same as Add in this implementation. +func (f *RealFIFO) Update(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Updated, false, obj) + + return retErr +} + +// Delete removes an item. It doesn't add it to the queue, because +// this implementation assumes the consumer only cares about the objects, +// not the order in which they were created/added. +func (f *RealFIFO) Delete(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Deleted, false, obj) + + return retErr +} + +// IsClosed checks if the queue is closed +func (f *RealFIFO) IsClosed() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.closed +} + +// Pop waits until an item is ready and processes it. If multiple items are +// ready, they are returned in the order in which they were added/updated. +// The item is removed from the queue (and the store) before it is processed. +// process function is called under lock, so it is safe +// update data structures in it that need to be in sync with the queue. +func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { + f.lock.Lock() + defer f.lock.Unlock() + + for len(f.items) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the f.closed is set and the condition is broadcasted. + // Which causes this loop to continue and return from the Pop(). + if f.closed { + return nil, ErrFIFOClosed + } + + f.cond.Wait() + } + + isInInitialList := !f.hasSynced_locked() + item := f.items[0] + // The underlying array still exists and references this object, so the object will not be garbage collected unless we zero the reference. + f.items[0] = Delta{} + f.items = f.items[1:] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } + + // Only log traces if the queue depth is greater than 10 and it takes more than + // 100 milliseconds to process one item from the queue. + // Queue depth never goes high because processing an item is locking the queue, + // and new items can't be added until processing finish. + // https://github.com/kubernetes/kubernetes/issues/103789 + if len(f.items) > 10 { + id, _ := f.keyOf(item) + trace := utiltrace.New("RealFIFO Pop Process", + utiltrace.Field{Key: "ID", Value: id}, + utiltrace.Field{Key: "Depth", Value: len(f.items)}, + utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) + defer trace.LogIfLong(100 * time.Millisecond) + } + + // we wrap in Deltas here to be compatible with preview Pop functions and those interpreting the return value. + err := process(Deltas{item}, isInInitialList) + return Deltas{item}, err +} + +// Replace +// 1. finds those items in f.items that are not in newItems and creates synthetic deletes for them +// 2. finds items in knownObjects that are not in newItems and creates synthetic deletes for them +// 3. adds the newItems to the queue +func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error { + f.lock.Lock() + defer f.lock.Unlock() + + // determine the keys of everything we're adding. We cannot add the items until after the synthetic deletes have been + // created for items that don't existing in newItems + newKeys := sets.Set[string]{} + for _, obj := range newItems { + key, err := f.keyOf(obj) + if err != nil { + return KeyError{obj, err} + } + newKeys.Insert(key) + } + + queuedItems := f.items + queuedKeys := []string{} + lastQueuedItemForKey := map[string]Delta{} + for _, queuedItem := range queuedItems { + queuedKey, err := f.keyOf(queuedItem.Object) + if err != nil { + return KeyError{queuedItem.Object, err} + } + + if _, seen := lastQueuedItemForKey[queuedKey]; !seen { + queuedKeys = append(queuedKeys, queuedKey) + } + lastQueuedItemForKey[queuedKey] = queuedItem + } + + // all the deletes already in the queue are important. There are two cases + // 1. queuedItems has delete for key/X and newItems has replace for key/X. This means the queued UID was deleted and a new one was created. + // 2. queuedItems has a delete for key/X and newItems does NOT have key/X. This means the queued item was deleted. + // Do deletion detection against objects in the queue. + for _, queuedKey := range queuedKeys { + if newKeys.Has(queuedKey) { + continue + } + + // Delete pre-existing items not in the new list. + // This could happen if watch deletion event was missed while + // disconnected from apiserver. + lastQueuedItem := lastQueuedItemForKey[queuedKey] + // if we've already got the item marked as deleted, no need to add another delete + if lastQueuedItem.Type == Deleted { + continue + } + + // if we got here, then the last entry we have for the queued item is *not* a deletion and we need to add a delete + deletedObj := lastQueuedItem.Object + + retErr := f.addToItems_locked(Deleted, true, DeletedFinalStateUnknown{ + Key: queuedKey, + Obj: deletedObj, + }) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + // Detect deletions for objects not present in the queue, but present in KnownObjects + knownKeys := f.knownObjects.ListKeys() + for _, knownKey := range knownKeys { + if newKeys.Has(knownKey) { // still present + continue + } + if _, inQueuedItems := lastQueuedItemForKey[knownKey]; inQueuedItems { // already added delete for these + continue + } + + deletedObj, exists, err := f.knownObjects.GetByKey(knownKey) + if err != nil { + deletedObj = nil + utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err)) + } else if !exists { + deletedObj = nil + utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey)) + } + retErr := f.addToItems_locked(Deleted, false, DeletedFinalStateUnknown{ + Key: knownKey, + Obj: deletedObj, + }) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + // now that we have the deletes we need for items, we can add the newItems to the items queue + for _, obj := range newItems { + retErr := f.addToItems_locked(Replaced, false, obj) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(f.items) + } + + return nil +} + +// Resync will ensure that every object in the Store has its key in the queue. +// This should be a no-op, because that property is maintained by all operations. +func (f *RealFIFO) Resync() error { + // TODO this cannot logically be done by the FIFO, it can only be done by the indexer + f.lock.Lock() + defer f.lock.Unlock() + + if f.knownObjects == nil { + return nil + } + + keysInQueue := sets.Set[string]{} + for _, item := range f.items { + key, err := f.keyOf(item.Object) + if err != nil { + return KeyError{item, err} + } + keysInQueue.Insert(key) + } + + knownKeys := f.knownObjects.ListKeys() + for _, knownKey := range knownKeys { + // If we are doing Resync() and there is already an event queued for that object, + // we ignore the Resync for it. This is to avoid the race, in which the resync + // comes with the previous value of object (since queueing an event for the object + // doesn't trigger changing the underlying store . + if keysInQueue.Has(knownKey) { + continue + } + + knownObj, exists, err := f.knownObjects.GetByKey(knownKey) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err)) + continue + } else if !exists { + utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey)) + continue + } + + retErr := f.addToItems_locked(Sync, true, knownObj) + if retErr != nil { + return fmt.Errorf("couldn't queue object: %w", err) + } + } + + return nil +} + +// NewRealFIFO returns a Store which can be used to queue up items to +// process. +func NewRealFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter, transformer TransformFunc) *RealFIFO { + if knownObjects == nil { + panic("coding error: knownObjects must be provided") + } + + f := &RealFIFO{ + items: make([]Delta, 0, 10), + keyFunc: keyFunc, + knownObjects: knownObjects, + transformer: transformer, + } + f.cond.L = &f.lock + return f +} diff --git a/tools/cache/the_real_fifo_test.go b/tools/cache/the_real_fifo_test.go new file mode 100644 index 00000000..649ea368 --- /dev/null +++ b/tools/cache/the_real_fifo_test.go @@ -0,0 +1,976 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "reflect" + "runtime" + "testing" + "time" +) + +func (f *RealFIFO) getItems() []Delta { + f.lock.Lock() + defer f.lock.Unlock() + + ret := make([]Delta, len(f.items)) + copy(ret, f.items) + return ret +} + +const closedFIFOName = "FIFO WAS CLOSED" + +func popN(queue Queue, count int) []interface{} { + result := []interface{}{} + for i := 0; i < count; i++ { + queue.Pop(func(obj interface{}, isInInitialList bool) error { + result = append(result, obj) + return nil + }) + } + return result +} + +// helper function to reduce stuttering +func testRealFIFOPop(f *RealFIFO) testFifoObject { + val := Pop(f) + if val == nil { + return testFifoObject{name: closedFIFOName} + } + return val.(Deltas).Newest().Object.(testFifoObject) +} + +func emptyKnownObjects() KeyListerGetter { + return literalListerGetter( + func() []testFifoObject { + return []testFifoObject{} + }, + ) +} + +func TestRealFIFO_basic(t *testing.T) { + f := NewRealFIFO(testFifoObjectKeyFunc, emptyKnownObjects(), nil) + const amount = 500 + go func() { + for i := 0; i < amount; i++ { + f.Add(mkFifoObj(string([]rune{'a', rune(i)}), i+1)) + } + }() + go func() { + for u := uint64(0); u < amount; u++ { + f.Add(mkFifoObj(string([]rune{'b', rune(u)}), u+1)) + } + }() + + lastInt := int(0) + lastUint := uint64(0) + for i := 0; i < amount*2; i++ { + switch obj := testRealFIFOPop(f).val.(type) { + case int: + if obj <= lastInt { + t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) + } + lastInt = obj + case uint64: + if obj <= lastUint { + t.Errorf("got %v (uint) out of order, last was %v", obj, lastUint) + } else { + lastUint = obj + } + default: + t.Fatalf("unexpected type %#v", obj) + } + } +} + +// TestRealFIFO_replaceWithDeleteDeltaIn tests that a `Sync` delta for an +// object `O` with ID `X` is added when .Replace is called and `O` is among the +// replacement objects even if the RealFIFO already stores in terminal position +// a delta of type `Delete` for ID `X`. Not adding the `Sync` delta causes +// SharedIndexInformers to miss `O`'s create notification, see https://github.com/kubernetes/kubernetes/issues/83810 +// for more details. +func TestRealFIFO_replaceWithDeleteDeltaIn(t *testing.T) { + oldObj := mkFifoObj("foo", 1) + newObj := mkFifoObj("foo", 2) + + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{oldObj} + }), + nil, + ) + + f.Delete(oldObj) + f.Replace([]interface{}{newObj}, "") + + actualDeltas := f.getItems() + expectedDeltas := []Delta{ + {Type: Deleted, Object: oldObj}, + {Type: Replaced, Object: newObj}, + } + if !reflect.DeepEqual(expectedDeltas, actualDeltas) { + t.Errorf("expected %#v, got %#v", expectedDeltas, actualDeltas) + } +} + +func TestRealFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *RealFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + //{ + // // ATTENTION: difference with delta_fifo_test, there is no option for emitDeltaTypeReplaced + // name: "Replaced object should have only a single Delete", + // operations: func(f *RealFIFO) { + // f.emitDeltaTypeReplaced = true + // f.Add(obj) + // f.Replace([]interface{}{obj}, "0") + // f.Replace([]interface{}{}, "0") + // }, + // expectedDeltas: Deltas{ + // {Added, obj}, + // {Replaced, obj}, + // {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + // }, + //}, + { + name: "Deleted object should have only a single Delete", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + // Test with a RealFIFO with a backing KnownObjects + fWithKnownObjects := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{} + }), + nil, + ) + tt.operations(fWithKnownObjects) + actualDeltasWithKnownObjects := popN(fWithKnownObjects, len(fWithKnownObjects.getItems())) + actualAsDeltas := collapseDeltas(actualDeltasWithKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualAsDeltas) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualAsDeltas) + } + if len(fWithKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items) + } + + // ATTENTION: difference with delta_fifo_test, there is no option without knownObjects + }) + } +} + +func collapseDeltas(ins []interface{}) Deltas { + ret := Deltas{} + for _, curr := range ins { + for _, delta := range curr.(Deltas) { + ret = append(ret, delta) + } + } + return ret +} + +// ATTENTION: difference with delta_fifo_test, there is no requeue option anymore +// func TestDeltaFIFO_requeueOnPop(t *testing.T) { + +func TestRealFIFO_addUpdate(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("foo", 12)) + f.Delete(mkFifoObj("foo", 15)) + + // ATTENTION: difference with delta_fifo_test, all items on the list. DeltaFIFO.List only showed newest, but Pop processed all. + expected1 := []Delta{ + { + Type: Added, + Object: mkFifoObj("foo", 10), + }, + { + Type: Updated, + Object: mkFifoObj("foo", 12), + }, + { + Type: Deleted, + Object: mkFifoObj("foo", 15), + }, + } + if e, a := expected1, f.getItems(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + + got := make(chan testFifoObject, 4) + done := make(chan struct{}) + go func() { + defer close(done) + for { + obj := testRealFIFOPop(f) + if obj.name == closedFIFOName { + break + } + t.Logf("got a thing %#v", obj) + t.Logf("D len: %v", len(f.items)) + got <- obj + } + }() + + first := <-got + if e, a := 10, first.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + second := <-got + if e, a := 12, second.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + third := <-got + if e, a := 15, third.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected.val) + case <-time.After(50 * time.Millisecond): + } + + if e, a := 0, len(f.getItems()); e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + f.Close() + <-done +} + +func TestRealFIFO_transformer(t *testing.T) { + mk := func(name string, rv int) testFifoObject { + return mkFifoObj(name, &rvAndXfrm{rv, 0}) + } + xfrm := TransformFunc(func(obj interface{}) (interface{}, error) { + switch v := obj.(type) { + case testFifoObject: + v.val.(*rvAndXfrm).xfrm++ + case DeletedFinalStateUnknown: + if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 { + return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj) + } + default: + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + return obj, nil + }) + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + mustTransform := func(obj interface{}) interface{} { + ret, err := xfrm(obj) + must(err) + return ret + } + + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + xfrm, + ) + must(f.Add(mk("foo", 10))) + must(f.Add(mk("bar", 11))) + must(f.Update(mk("foo", 12))) + must(f.Delete(mk("foo", 15))) + must(f.Replace([]interface{}{}, "")) + must(f.Add(mk("bar", 16))) + must(f.Replace([]interface{}{}, "")) + + // ATTENTION: difference with delta_fifo_test, without compression, we keep all the items, including bar being deleted multiple times. + // DeltaFIFO starts by checking keys, we start by checking types and keys + expected1 := []Delta{ + {Type: Added, Object: mustTransform(mk("foo", 10))}, + {Type: Added, Object: mustTransform(mk("bar", 11))}, + {Type: Updated, Object: mustTransform(mk("foo", 12))}, + {Type: Deleted, Object: mustTransform(mk("foo", 15))}, + {Type: Deleted, Object: DeletedFinalStateUnknown{Key: "bar", Obj: mustTransform(mk("bar", 11))}}, + {Type: Added, Object: mustTransform(mk("bar", 16))}, + {Type: Deleted, Object: DeletedFinalStateUnknown{Key: "bar", Obj: mustTransform(mk("bar", 16))}}, + } + actual1 := f.getItems() + if len(expected1) != len(actual1) { + t.Fatalf("Expected %+v, got %+v", expected1, actual1) + } + for i := 0; i < len(actual1); i++ { + e := expected1[i] + a := actual1[i] + if e.Type != a.Type { + t.Errorf("%d Expected %+v, got %+v", i, e, a) + } + eKey, err := f.keyOf(e) + if err != nil { + t.Fatal(err) + } + aKey, err := f.keyOf(a) + if err != nil { + t.Fatal(err) + } + if eKey != aKey { + t.Errorf("%d Expected %+v, got %+v", i, eKey, aKey) + } + } + + for i := 0; i < len(expected1); i++ { + obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil }) + if err != nil { + t.Fatalf("got nothing on try %v?", i) + } + a := obj.(Deltas)[0] + e := expected1[i] + if !reflect.DeepEqual(e, a) { + t.Errorf("%d Expected %+v, got %+v", i, e, a) + } + } +} + +func TestRealFIFO_enqueueingNoLister(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("bar", 15)) + f.Add(mkFifoObj("qux", 17)) + f.Delete(mkFifoObj("qux", 18)) + + // RealFIFO queues everything + f.Delete(mkFifoObj("baz", 20)) + + // ATTENTION: difference with delta_fifo_test, without compression every item is queued + expectList := []int{10, 15, 17, 18, 20} + for _, expect := range expectList { + if e, a := expect, testRealFIFOPop(f).val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + } + if e, a := 0, len(f.getItems()); e != a { + t.Errorf("queue unexpectedly not empty: %v != %v\n%#v", e, a, f.getItems()) + } +} + +func TestRealFIFO_enqueueingWithLister(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("bar", 15)) + + // This delete does enqueue the deletion, because "baz" is in the key lister. + f.Delete(mkFifoObj("baz", 20)) + + expectList := []int{10, 15, 20} + for _, expect := range expectList { + if e, a := expect, testRealFIFOPop(f).val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + } + if e, a := 0, len(f.items); e != a { + t.Errorf("queue unexpectedly not empty: %v != %v", e, a) + } +} + +func TestRealFIFO_addReplace(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0") + got := make(chan testFifoObject, 3) + done := make(chan struct{}) + go func() { + defer close(done) + for { + obj := testRealFIFOPop(f) + if obj.name == closedFIFOName { + break + } + got <- obj + } + }() + + // ATTENTION: difference with delta_fifo_test, we get every event instead of the .Newest making us skip some for the test, but not at runtime. + curr := <-got + if e, a := 10, curr.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + curr = <-got + if e, a := 15, curr.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected.val) + case <-time.After(50 * time.Millisecond): + } + + if items := f.getItems(); len(items) > 0 { + t.Errorf("item did not get removed") + } + f.Close() + <-done +} + +func TestRealFIFO_ResyncNonExisting(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Delete(mkFifoObj("foo", 10)) + f.Resync() + + deltas := f.getItems() + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Deleted { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + +func TestRealFIFO_Resync(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Resync() + + deltas := f.getItems() + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Sync { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + +func TestRealFIFO_DeleteExistingNonPropagated(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 5)) + f.Delete(mkFifoObj("foo", 6)) + + deltas := f.getItems() + if len(deltas) != 2 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[len(deltas)-1].Type != Deleted { + t.Errorf("unexpected delta: %v", deltas[len(deltas)-1]) + } +} + +func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { + // We test with only one pre-existing object because there is no + // promise about how their deletes are ordered. + + // Try it with a pre-existing Delete + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try starting with an Add instead of a Delete + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Add(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + expectedList = []Deltas{ + {{Added, mkFifoObj("baz", 10)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try deleting and recreating the object in the queue, then delete it by a Replace call + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Delete(mkFifoObj("bar", 6)) + f.Add(mkFifoObj("bar", 100)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + expectedList = []Deltas{ + {{Deleted, mkFifoObj("bar", 6)}}, + {{Added, mkFifoObj("bar", 100)}}, + // Since "bar" has a newer object in the queue than in the state, + // it should get a tombstone key with the latest object from the queue + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try syncing it first to ensure the delete use the latest version + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0") + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + // ATTENTION: difference with delta_fifo_test, deltaFifo associated by key, but realFIFO orders across all keys, so this ordering changed + expectedList = []Deltas{ + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "baz" didn't have a delete event and wasn't in the Replace list + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("bar", 100)}}, + {{Replaced, mkFifoObj("foo", 5)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for i, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("%d Expected %#v, got %#v", i, e, a) + } + } + + // Now try starting without an explicit KeyListerGetter + f = NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + {{Added, mkFifoObj("baz", 10)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// TestRealFIFO_ReplaceMakesDeletionsReplaced is the same as the above test, but +// ensures that a Replaced DeltaType is emitted. +func TestRealFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 6)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// ATTENTION: difference with delta_fifo_test, the previous value was hardcoded as use "Replace" so I've eliminated the option to set it differently +//func TestRealFIFO_ReplaceDeltaType(t *testing.T) { + +func TestRealFIFO_UpdateResyncRace(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Update(mkFifoObj("foo", 6)) + f.Resync() + + expectedList := []Deltas{ + {{Updated, mkFifoObj("foo", 6)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +func TestRealFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + if f.HasSynced() { + t.Errorf("Expected HasSynced to be false") + } + cur, initial := pop2[Deltas](f) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + if initial != true { + t.Error("Expected initial list item") + } + } + if !f.HasSynced() { + t.Errorf("Expected HasSynced to be true") + } +} + +func TestRealFIFO_detectLineJumpers(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + + f.Add(mkFifoObj("foo", 10)) + f.Add(mkFifoObj("bar", 1)) + f.Add(mkFifoObj("foo", 11)) + f.Add(mkFifoObj("foo", 13)) + f.Add(mkFifoObj("zab", 30)) + + // ATTENTION: difference with delta_fifo_test, every event is delivered in order + + if e, a := 10, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + + f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line + + if e, a := 1, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 11, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 13, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 30, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 14, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +func TestRealFIFO_KeyOf(t *testing.T) { + f := RealFIFO{keyFunc: testFifoObjectKeyFunc} + + table := []struct { + obj interface{} + key string + }{ + {obj: testFifoObject{name: "A"}, key: "A"}, + {obj: DeletedFinalStateUnknown{Key: "B", Obj: nil}, key: "B"}, + {obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"}, + {obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D", Obj: nil}}}, key: "D"}, + } + + for _, item := range table { + got, err := f.keyOf(item.obj) + if err != nil { + t.Errorf("Unexpected error for %q: %v", item.obj, err) + continue + } + if e, a := item.key, got; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} + +func TestRealFIFO_HasSynced(t *testing.T) { + tests := []struct { + actions []func(f *RealFIFO) + expectedSynced bool + }{ + { + actions: []func(f *RealFIFO){}, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Add(mkFifoObj("a", 1)) }, + }, + expectedSynced: true, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{}, "0") }, + }, + expectedSynced: true, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + }, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: true, + }, + { + // This test case won't happen in practice since a Reflector, the only producer for delta_fifo today, always passes a complete snapshot consistent in time; + // there cannot be duplicate keys in the list or apiserver is broken. + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("a", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + // ATTENTION: difference with delta_fifo_test, every event is delivered, so a is listed twice and must be popped twice to remove both + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: true, + }, + } + + for i, test := range tests { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + + for _, action := range test.actions { + action(f) + } + if e, a := test.expectedSynced, f.HasSynced(); a != e { + t.Errorf("test case %v failed, expected: %v , got %v", i, e, a) + } + } +} + +// TestRealFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue +// should unblock and return after Close is called. +func TestRealFIFO_PopShouldUnblockWhenClosed(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + + c := make(chan struct{}) + const jobs = 10 + for i := 0; i < jobs; i++ { + go func() { + f.Pop(func(obj interface{}, isInInitialList bool) error { + return nil + }) + c <- struct{}{} + }() + } + + runtime.Gosched() + f.Close() + + for i := 0; i < jobs; i++ { + select { + case <-c: + case <-time.After(500 * time.Millisecond): + t.Fatalf("timed out waiting for Pop to return after Close") + } + } +}