diff --git a/staging/src/k8s.io/client-go/features/known_features.go b/staging/src/k8s.io/client-go/features/known_features.go index a74f6a83335..344d2ebb72b 100644 --- a/staging/src/k8s.io/client-go/features/known_features.go +++ b/staging/src/k8s.io/client-go/features/known_features.go @@ -53,6 +53,12 @@ const ( // alpha: v1.30 InformerResourceVersion Feature = "InformerResourceVersion" + // owner: @deads2k + // beta: v1.33 + // + // Refactor informers to deliver watch stream events in order instead of out of order. + InOrderInformers Feature = "InOrderInformers" + // owner: @p0lyn0mial // beta: v1.30 // @@ -73,5 +79,6 @@ var defaultKubernetesFeatureGates = map[Feature]FeatureSpec{ ClientsAllowCBOR: {Default: false, PreRelease: Alpha}, ClientsPreferCBOR: {Default: false, PreRelease: Alpha}, InformerResourceVersion: {Default: false, PreRelease: Alpha}, + InOrderInformers: {Default: true, PreRelease: Beta}, WatchListClient: {Default: false, PreRelease: Beta}, } diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 41ead09e786..1497700d818 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -19,6 +19,7 @@ package cache import ( "context" "errors" + clientgofeaturegate "k8s.io/client-go/features" "sync" "time" @@ -72,13 +73,6 @@ type Config struct { // resync. ShouldResync ShouldResyncFunc - // If true, when Process() returns an error, re-enqueue the object. - // TODO: add interface to let you inject a delay/backoff or drop - // the object completely if desired. Pass the object in - // question to this interface as a parameter. This is probably moot - // now that this functionality appears at a higher level. - RetryOnError bool - // Called whenever the ListAndWatch drops the connection with an error. // // Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging. @@ -213,15 +207,11 @@ func (c *controller) processLoop(ctx context.Context) { // TODO: Plumb through the ctx so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. - obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) + _, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } - if c.config.RetryOnError { - // This is the safe way to re-enqueue. - c.config.Queue.AddIfNotPresent(obj) - } } } } @@ -603,11 +593,17 @@ func newInformer(clientState Store, options InformerOptions) Controller { // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: clientState, - EmitDeltaTypeReplaced: true, - Transformer: options.Transform, - }) + + var fifo Queue + if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { + fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform) + } else { + fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: clientState, + EmitDeltaTypeReplaced: true, + Transformer: options.Transform, + }) + } cfg := &Config{ Queue: fifo, @@ -615,7 +611,6 @@ func newInformer(clientState Store, options InformerOptions) Controller { ObjectType: options.ObjectType, FullResyncPeriod: options.ResyncPeriod, MinWatchTimeout: options.MinWatchTimeout, - RetryOnError: false, Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_test.go index dba2dfe6591..05425792057 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_test.go @@ -49,10 +49,7 @@ func Example() { // This will hold incoming changes. Note how we pass downstream in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KeyFunction: MetaNamespaceKeyFunc, - KnownObjects: downstream, - }) + fifo := NewRealFIFO(MetaNamespaceKeyFunc, downstream, nil) // Let's do threadsafe output to get predictable test results. deletionCounter := make(chan string, 1000) @@ -62,7 +59,6 @@ func Example() { ListerWatcher: source, ObjectType: &v1.Pod{}, FullResyncPeriod: time.Millisecond * 100, - RetryOnError: false, // Let's implement a simple controller that just deletes // everything that comes in. @@ -88,7 +84,7 @@ func Example() { // fifo's KeyOf is easiest, because it handles // DeletedFinalStateUnknown markers. - key, err := fifo.KeyOf(newest.Object) + key, err := fifo.keyOf(newest.Object) if err != nil { return err } diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 4bb526cd039..264d7559a05 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -369,43 +369,6 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { return f.queueActionLocked(Deleted, obj) } -// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already -// present in the set, it is neither enqueued nor added to the set. -// -// This is useful in a single producer/consumer scenario so that the consumer can -// safely retry items without contending with the producer and potentially enqueueing -// stale items. -// -// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is -// different from the Add/Update/Delete functions. -func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { - deltas, ok := obj.(Deltas) - if !ok { - return fmt.Errorf("object must be of type deltas, but got: %#v", obj) - } - id, err := f.KeyOf(deltas) - if err != nil { - return KeyError{obj, err} - } - f.lock.Lock() - defer f.lock.Unlock() - f.addIfNotPresent(id, deltas) - return nil -} - -// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller -// already holds the fifo lock. -func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { - f.populated = true - if _, exists := f.items[id]; exists { - return - } - - f.queue = append(f.queue, id) - f.items[id] = deltas - f.cond.Broadcast() -} - // re-listing and watching can deliver the same update multiple times in any // order. This will combine the most recent two deltas if they are the same. func dedupDeltas(deltas Deltas) Deltas { @@ -508,61 +471,6 @@ func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType Del return nil } -// List returns a list of all the items; it returns the object -// from the most recent Delta. -// You should treat the items returned inside the deltas as immutable. -func (f *DeltaFIFO) List() []interface{} { - f.lock.RLock() - defer f.lock.RUnlock() - return f.listLocked() -} - -func (f *DeltaFIFO) listLocked() []interface{} { - list := make([]interface{}, 0, len(f.items)) - for _, item := range f.items { - list = append(list, item.Newest().Object) - } - return list -} - -// ListKeys returns a list of all the keys of the objects currently -// in the FIFO. -func (f *DeltaFIFO) ListKeys() []string { - f.lock.RLock() - defer f.lock.RUnlock() - list := make([]string, 0, len(f.queue)) - for _, key := range f.queue { - list = append(list, key) - } - return list -} - -// Get returns the complete list of deltas for the requested item, -// or sets exists=false. -// You should treat the items returned inside the deltas as immutable. -func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { - key, err := f.KeyOf(obj) - if err != nil { - return nil, false, KeyError{obj, err} - } - return f.GetByKey(key) -} - -// GetByKey returns the complete list of deltas for the requested item, -// setting exists=false if that list is empty. -// You should treat the items returned inside the deltas as immutable. -func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { - f.lock.RLock() - defer f.lock.RUnlock() - d, exists := f.items[key] - if exists { - // Copy item's slice so operations on this slice - // won't interfere with the object we return. - d = copyDeltas(d) - } - return d, exists, nil -} - // IsClosed checks if the queue is closed func (f *DeltaFIFO) IsClosed() bool { f.lock.Lock() @@ -576,9 +484,7 @@ func (f *DeltaFIFO) IsClosed() bool { // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). // process function is called under lock, so it is safe to update data structures -// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc -// may return an instance of ErrRequeue with a nested error to indicate the current -// item should be requeued (equivalent to calling AddIfNotPresent under the lock). +// in it that need to be in sync with the queue (e.g. knownKeys). // process should avoid expensive I/O operation so that other queue operations, i.e. // Add() and Get(), won't be blocked for too long. // @@ -625,10 +531,6 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { defer trace.LogIfLong(100 * time.Millisecond) } err := process(item, isInInitialList) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index 80994beb455..8f069eb1c57 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -17,7 +17,6 @@ limitations under the License. package cache import ( - "errors" "fmt" "reflect" "runtime" @@ -25,6 +24,66 @@ import ( "time" ) +// List returns a list of all the items; it returns the object +// from the most recent Delta. +// You should treat the items returned inside the deltas as immutable. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + return f.listLocked() +} + +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) listLocked() []interface{} { + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + list = append(list, item.Newest().Object) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the FIFO. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) ListKeys() []string { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]string, 0, len(f.queue)) + for _, key := range f.queue { + list = append(list, key) + } + return list +} + +// Get returns the complete list of deltas for the requested item, +// or sets exists=false. +// You should treat the items returned inside the deltas as immutable. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { + key, err := f.KeyOf(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + return f.GetByKey(key) +} + +// GetByKey returns the complete list of deltas for the requested item, +// setting exists=false if that list is empty. +// You should treat the items returned inside the deltas as immutable. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { + f.lock.RLock() + defer f.lock.RUnlock() + d, exists := f.items[key] + if exists { + // Copy item's slice so operations on this slice + // won't interfere with the object we return. + d = copyDeltas(d) + } + return d, exists, nil +} + // helper function to reduce stuttering func testPop(f *DeltaFIFO) testFifoObject { return Pop(f).(Deltas).Newest().Object.(testFifoObject) @@ -245,50 +304,6 @@ func TestDeltaFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { } } -func TestDeltaFIFO_requeueOnPop(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) - - f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: nil} - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: fmt.Errorf("test error")} - }) - if err == nil || err.Error() != "test error" { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return nil - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); ok || err != nil { - t.Fatalf("object should have been removed: %t %v", ok, err) - } -} - func TestDeltaFIFO_addUpdate(t *testing.T) { f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) f.Add(mkFifoObj("foo", 10)) @@ -821,39 +836,6 @@ func TestDeltaFIFO_detectLineJumpers(t *testing.T) { } } -func TestDeltaFIFO_addIfNotPresent(t *testing.T) { - f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc}) - - emptyDeltas := Deltas{} - if err := f.AddIfNotPresent(emptyDeltas); err == nil || !errors.Is(err, ErrZeroLengthDeltasObject) { - t.Errorf("Expected error '%v', got %v", ErrZeroLengthDeltasObject, err) - } - - f.Add(mkFifoObj("b", 3)) - b3 := Pop(f) - f.Add(mkFifoObj("c", 4)) - c4 := Pop(f) - if e, a := 0, len(f.items); e != a { - t.Fatalf("Expected %v, got %v items in queue", e, a) - } - - f.Add(mkFifoObj("a", 1)) - f.Add(mkFifoObj("b", 2)) - f.AddIfNotPresent(b3) - f.AddIfNotPresent(c4) - - if e, a := 3, len(f.items); a != e { - t.Fatalf("expected queue length %d, got %d", e, a) - } - - expectedValues := []int{1, 2, 4} - for _, expected := range expectedValues { - if actual := testPop(f).val; actual != expected { - t.Fatalf("expected value %d, got %d", expected, actual) - } - } -} - func TestDeltaFIFO_KeyOf(t *testing.T) { f := DeltaFIFO{keyFunc: testFifoObjectKeyFunc} diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo.go b/staging/src/k8s.io/client-go/tools/cache/fifo.go index dd13c4ea774..5c2ca90084d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo.go @@ -27,30 +27,16 @@ import ( // It is supposed to process the accumulator popped from the queue. type PopProcessFunc func(obj interface{}, isInInitialList bool) error -// ErrRequeue may be returned by a PopProcessFunc to safely requeue -// the current item. The value of Err will be returned from Pop. -type ErrRequeue struct { - // Err is returned by the Pop function - Err error -} - // ErrFIFOClosed used when FIFO is closed var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") -func (e ErrRequeue) Error() string { - if e.Err == nil { - return "the popped item should be requeued without returning an error" - } - return e.Err.Error() -} - -// Queue extends Store with a collection of Store keys to "process". +// Queue extends ReflectorStore with a collection of Store keys to "process". // Every Add, Update, or Delete may put the object's key in that collection. // A Queue has a way to derive the corresponding key given an accumulator. // A Queue can be accessed concurrently from multiple goroutines. // A Queue can be "closed", after which Pop operations return an error. type Queue interface { - Store + ReflectorStore // Pop blocks until there is at least one key to process or the // Queue is closed. In the latter case Pop returns with an error. @@ -64,11 +50,6 @@ type Queue interface { // Pop. Pop(PopProcessFunc) (interface{}, error) - // AddIfNotPresent puts the given accumulator into the Queue (in - // association with the accumulator's key) if and only if that key - // is not already associated with a non-empty accumulator. - AddIfNotPresent(interface{}) error - // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace // operation if that happened before any Add, AddIfNotPresent, @@ -177,36 +158,6 @@ func (f *FIFO) Add(obj interface{}) error { return nil } -// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already -// present in the set, it is neither enqueued nor added to the set. -// -// This is useful in a single producer/consumer scenario so that the consumer can -// safely retry items without contending with the producer and potentially enqueueing -// stale items. -func (f *FIFO) AddIfNotPresent(obj interface{}) error { - id, err := f.keyFunc(obj) - if err != nil { - return KeyError{obj, err} - } - f.lock.Lock() - defer f.lock.Unlock() - f.addIfNotPresent(id, obj) - return nil -} - -// addIfNotPresent assumes the fifo lock is already held and adds the provided -// item to the queue under id if it does not already exist. -func (f *FIFO) addIfNotPresent(id string, obj interface{}) { - f.populated = true - if _, exists := f.items[id]; exists { - return - } - - f.queue = append(f.queue, id) - f.items[id] = obj - f.cond.Broadcast() -} - // Update is the same as Add in this implementation. func (f *FIFO) Update(obj interface{}) error { return f.Add(obj) @@ -227,46 +178,6 @@ func (f *FIFO) Delete(obj interface{}) error { return err } -// List returns a list of all the items. -func (f *FIFO) List() []interface{} { - f.lock.RLock() - defer f.lock.RUnlock() - list := make([]interface{}, 0, len(f.items)) - for _, item := range f.items { - list = append(list, item) - } - return list -} - -// ListKeys returns a list of all the keys of the objects currently -// in the FIFO. -func (f *FIFO) ListKeys() []string { - f.lock.RLock() - defer f.lock.RUnlock() - list := make([]string, 0, len(f.items)) - for key := range f.items { - list = append(list, key) - } - return list -} - -// Get returns the requested item, or sets exists=false. -func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { - key, err := f.keyFunc(obj) - if err != nil { - return nil, false, KeyError{obj, err} - } - return f.GetByKey(key) -} - -// GetByKey returns the requested item, or sets exists=false. -func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { - f.lock.RLock() - defer f.lock.RUnlock() - item, exists = f.items[key] - return item, exists, nil -} - // IsClosed checks if the queue is closed func (f *FIFO) IsClosed() bool { f.lock.Lock() @@ -307,10 +218,6 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { } delete(f.items, id) err := process(item, isInInitialList) - if e, ok := err.(ErrRequeue); ok { - f.addIfNotPresent(id, item) - err = e.Err - } return item, err } } diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/fifo_test.go index 655f1378539..1831889b09d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo_test.go @@ -17,13 +17,56 @@ limitations under the License. package cache import ( - "fmt" "reflect" "runtime" "testing" "time" ) +// List returns a list of all the items. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *FIFO) List() []interface{} { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]interface{}, 0, len(f.items)) + for _, item := range f.items { + list = append(list, item) + } + return list +} + +// ListKeys returns a list of all the keys of the objects currently +// in the FIFO. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *FIFO) ListKeys() []string { + f.lock.RLock() + defer f.lock.RUnlock() + list := make([]string, 0, len(f.items)) + for key := range f.items { + list = append(list, key) + } + return list +} + +// Get returns the requested item, or sets exists=false. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { + key, err := f.keyFunc(obj) + if err != nil { + return nil, false, KeyError{obj, err} + } + return f.GetByKey(key) +} + +// GetByKey returns the requested item, or sets exists=false. +// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. +func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { + f.lock.RLock() + defer f.lock.RUnlock() + item, exists = f.items[key] + return item, exists, nil +} + func testFifoObjectKeyFunc(obj interface{}) (string, error) { return obj.(testFifoObject).name, nil } @@ -72,50 +115,6 @@ func TestFIFO_basic(t *testing.T) { } } -func TestFIFO_requeueOnPop(t *testing.T) { - f := NewFIFO(testFifoObjectKeyFunc) - - f.Add(mkFifoObj("foo", 10)) - _, err := f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: nil} - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return ErrRequeue{Err: fmt.Errorf("test error")} - }) - if err == nil || err.Error() != "test error" { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); !ok || err != nil { - t.Fatalf("object should have been requeued: %t %v", ok, err) - } - - _, err = f.Pop(func(obj interface{}, isInInitialList bool) error { - if obj.(testFifoObject).name != "foo" { - t.Fatalf("unexpected object: %#v", obj) - } - return nil - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if _, ok, err := f.GetByKey("foo"); ok || err != nil { - t.Fatalf("object should have been removed: %t %v", ok, err) - } -} - func TestFIFO_addUpdate(t *testing.T) { f := NewFIFO(testFifoObjectKeyFunc) f.Add(mkFifoObj("foo", 10)) @@ -204,26 +203,6 @@ func TestFIFO_detectLineJumpers(t *testing.T) { } } -func TestFIFO_addIfNotPresent(t *testing.T) { - f := NewFIFO(testFifoObjectKeyFunc) - - f.Add(mkFifoObj("a", 1)) - f.Add(mkFifoObj("b", 2)) - f.AddIfNotPresent(mkFifoObj("b", 3)) - f.AddIfNotPresent(mkFifoObj("c", 4)) - - if e, a := 3, len(f.items); a != e { - t.Fatalf("expected queue length %d, got %d", e, a) - } - - expectedValues := []int{1, 2, 4} - for _, expected := range expectedValues { - if actual := Pop(f).(testFifoObject).val; actual != expected { - t.Fatalf("expected value %d, got %d", expected, actual) - } - } -} - func TestFIFO_HasSynced(t *testing.T) { tests := []struct { actions []func(f *FIFO) diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index f8dac4f9ed4..0d054df43b1 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -55,6 +55,28 @@ var ( defaultMinWatchTimeout = 5 * time.Minute ) +// ReflectorStore is the subset of cache.Store that the reflector uses +type ReflectorStore interface { + // Add adds the given object to the accumulator associated with the given object's key + Add(obj interface{}) error + + // Update updates the given object in the accumulator associated with the given object's key + Update(obj interface{}) error + + // Delete deletes the given object from the accumulator associated with the given object's key + Delete(obj interface{}) error + + // Replace will delete the contents of the store, using instead the + // given list. Store takes ownership of the list, you should not reference + // it after calling this function. + Replace([]interface{}, string) error + + // Resync is meaningless in the terms appearing here but has + // meaning in some implementations that have non-trivial + // additional behavior (e.g., DeltaFIFO). + Resync() error +} + // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // name identifies this reflector. By default, it will be a file:line if possible. @@ -72,7 +94,7 @@ type Reflector struct { // The GVK of the object we expect to place in the store if unstructured. expectedGVK *schema.GroupVersionKind // The destination to sync up with the watch source - store Store + store ReflectorStore // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher // backoff manages backoff of ListWatch @@ -189,13 +211,13 @@ func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interfa // NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack // that is outside this package. See NewReflectorWithOptions for further information. -func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { +func NewReflector(lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector { return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod}) } // NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further // information. -func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { +func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector { return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod}) } @@ -234,7 +256,7 @@ type ReflectorOptions struct { // "yes". This enables you to use reflectors to periodically process // everything as well as incrementally processing the things that // change. -func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector { +func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store ReflectorStore, options ReflectorOptions) *Reflector { reflectorClock := options.Clock if reflectorClock == nil { reflectorClock = clock.RealClock{} @@ -798,7 +820,7 @@ func handleWatch( ctx context.Context, start time.Time, w watch.Interface, - store Store, + store ReflectorStore, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, @@ -826,7 +848,7 @@ func handleAnyWatch( ctx context.Context, start time.Time, w watch.Interface, - store Store, + store ReflectorStore, expectedType reflect.Type, expectedGVK *schema.GroupVersionKind, name string, diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 49dd1edc976..a8156a28687 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -540,11 +540,16 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ - KnownObjects: s.indexer, - EmitDeltaTypeReplaced: true, - Transformer: s.transform, - }) + var fifo Queue + if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { + fifo = NewRealFIFO(MetaNamespaceKeyFunc, s.indexer, s.transform) + } else { + fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: s.indexer, + EmitDeltaTypeReplaced: true, + Transformer: s.transform, + }) + } cfg := &Config{ Queue: fifo, @@ -552,7 +557,6 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { ObjectType: s.objectType, ObjectDescription: s.objectDescription, FullResyncPeriod: s.resyncCheckPeriod, - RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, diff --git a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go new file mode 100644 index 00000000000..9be14ff3811 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go @@ -0,0 +1,407 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + utiltrace "k8s.io/utils/trace" + "sync" + "time" +) + +// RealFIFO is a Queue in which every notification from the Reflector is passed +// in order to the Queue via Pop. +// This means that it +// 1. delivers notifications for items that have been deleted +// 2. delivers multiple notifications per item instead of simply the most recent value +type RealFIFO struct { + lock sync.RWMutex + cond sync.Cond + + items []Delta + + // populated is true if the first batch of items inserted by Replace() has been populated + // or Delete/Add/Update was called first. + populated bool + // initialPopulationCount is the number of items inserted by the first call of Replace() + initialPopulationCount int + + // keyFunc is used to make the key used for queued item insertion and retrieval, and + // should be deterministic. + keyFunc KeyFunc + + // knownObjects list keys that are "known" --- affecting Delete(), + // Replace(), and Resync() + knownObjects KeyListerGetter + + // Indication the queue is closed. + // Used to indicate a queue is closed so a control loop can exit when a queue is empty. + // Currently, not used to gate any of CRUD operations. + closed bool + + // Called with every object if non-nil. + transformer TransformFunc +} + +var ( + _ = Queue(&RealFIFO{}) // RealFIFO is a Queue +) + +// Close the queue. +func (f *RealFIFO) Close() { + f.lock.Lock() + defer f.lock.Unlock() + f.closed = true + f.cond.Broadcast() +} + +// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or +// DeletedFinalStateUnknown objects. +func (f *RealFIFO) keyOf(obj interface{}) (string, error) { + if d, ok := obj.(Deltas); ok { + if len(d) == 0 { + return "", KeyError{obj, ErrZeroLengthDeltasObject} + } + obj = d.Newest().Object + } + if d, ok := obj.(Delta); ok { + obj = d.Object + } + if d, ok := obj.(DeletedFinalStateUnknown); ok { + return d.Key, nil + } + return f.keyFunc(obj) +} + +// HasSynced returns true if an Add/Update/Delete are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *RealFIFO) HasSynced() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.hasSynced_locked() +} + +// ignoring lint to reduce delta to the original for review. It's ok adjust later. +// +//lint:file-ignore ST1003: should not use underscores in Go names +func (f *RealFIFO) hasSynced_locked() bool { + return f.populated && f.initialPopulationCount == 0 +} + +// addToItems_locked appends to the delta list. +func (f *RealFIFO) addToItems_locked(deltaActionType DeltaType, skipTransform bool, obj interface{}) error { + // we must be able to read the keys in order to determine whether the knownObjcts and the items + // in this FIFO overlap + _, err := f.keyOf(obj) + if err != nil { + return KeyError{obj, err} + } + + // Every object comes through this code path once, so this is a good + // place to call the transform func. + // + // If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync, + // then the object have already gone through the transformer. + // + // If the objects already present in the cache are passed to Replace(), + // the transformer must be idempotent to avoid re-mutating them, + // or coordinate with all readers from the cache to avoid data races. + // Default informers do not pass existing objects to Replace. + if f.transformer != nil { + _, isTombstone := obj.(DeletedFinalStateUnknown) + if !isTombstone && !skipTransform { + var err error + obj, err = f.transformer(obj) + if err != nil { + return err + } + } + } + + f.items = append(f.items, Delta{ + Type: deltaActionType, + Object: obj, + }) + f.cond.Broadcast() + + return nil +} + +// Add inserts an item, and puts it in the queue. The item is only enqueued +// if it doesn't already exist in the set. +func (f *RealFIFO) Add(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Added, false, obj) + + return retErr +} + +// Update is the same as Add in this implementation. +func (f *RealFIFO) Update(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Updated, false, obj) + + return retErr +} + +// Delete removes an item. It doesn't add it to the queue, because +// this implementation assumes the consumer only cares about the objects, +// not the order in which they were created/added. +func (f *RealFIFO) Delete(obj interface{}) error { + f.lock.Lock() + defer f.lock.Unlock() + + f.populated = true + retErr := f.addToItems_locked(Deleted, false, obj) + + return retErr +} + +// IsClosed checks if the queue is closed +func (f *RealFIFO) IsClosed() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.closed +} + +// Pop waits until an item is ready and processes it. If multiple items are +// ready, they are returned in the order in which they were added/updated. +// The item is removed from the queue (and the store) before it is processed. +// process function is called under lock, so it is safe +// update data structures in it that need to be in sync with the queue. +func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { + f.lock.Lock() + defer f.lock.Unlock() + + for len(f.items) == 0 { + // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. + // When Close() is called, the f.closed is set and the condition is broadcasted. + // Which causes this loop to continue and return from the Pop(). + if f.closed { + return nil, ErrFIFOClosed + } + + f.cond.Wait() + } + + isInInitialList := !f.hasSynced_locked() + item := f.items[0] + // The underlying array still exists and references this object, so the object will not be garbage collected unless we zero the reference. + f.items[0] = Delta{} + f.items = f.items[1:] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } + + // Only log traces if the queue depth is greater than 10 and it takes more than + // 100 milliseconds to process one item from the queue. + // Queue depth never goes high because processing an item is locking the queue, + // and new items can't be added until processing finish. + // https://github.com/kubernetes/kubernetes/issues/103789 + if len(f.items) > 10 { + id, _ := f.keyOf(item) + trace := utiltrace.New("RealFIFO Pop Process", + utiltrace.Field{Key: "ID", Value: id}, + utiltrace.Field{Key: "Depth", Value: len(f.items)}, + utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) + defer trace.LogIfLong(100 * time.Millisecond) + } + + // we wrap in Deltas here to be compatible with preview Pop functions and those interpreting the return value. + err := process(Deltas{item}, isInInitialList) + return Deltas{item}, err +} + +// Replace +// 1. finds those items in f.items that are not in newItems and creates synthetic deletes for them +// 2. finds items in knownObjects that are not in newItems and creates synthetic deletes for them +// 3. adds the newItems to the queue +func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error { + f.lock.Lock() + defer f.lock.Unlock() + + // determine the keys of everything we're adding. We cannot add the items until after the synthetic deletes have been + // created for items that don't existing in newItems + newKeys := sets.Set[string]{} + for _, obj := range newItems { + key, err := f.keyOf(obj) + if err != nil { + return KeyError{obj, err} + } + newKeys.Insert(key) + } + + queuedItems := f.items + queuedKeys := []string{} + lastQueuedItemForKey := map[string]Delta{} + for _, queuedItem := range queuedItems { + queuedKey, err := f.keyOf(queuedItem.Object) + if err != nil { + return KeyError{queuedItem.Object, err} + } + + if _, seen := lastQueuedItemForKey[queuedKey]; !seen { + queuedKeys = append(queuedKeys, queuedKey) + } + lastQueuedItemForKey[queuedKey] = queuedItem + } + + // all the deletes already in the queue are important. There are two cases + // 1. queuedItems has delete for key/X and newItems has replace for key/X. This means the queued UID was deleted and a new one was created. + // 2. queuedItems has a delete for key/X and newItems does NOT have key/X. This means the queued item was deleted. + // Do deletion detection against objects in the queue. + for _, queuedKey := range queuedKeys { + if newKeys.Has(queuedKey) { + continue + } + + // Delete pre-existing items not in the new list. + // This could happen if watch deletion event was missed while + // disconnected from apiserver. + lastQueuedItem := lastQueuedItemForKey[queuedKey] + // if we've already got the item marked as deleted, no need to add another delete + if lastQueuedItem.Type == Deleted { + continue + } + + // if we got here, then the last entry we have for the queued item is *not* a deletion and we need to add a delete + deletedObj := lastQueuedItem.Object + + retErr := f.addToItems_locked(Deleted, true, DeletedFinalStateUnknown{ + Key: queuedKey, + Obj: deletedObj, + }) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + // Detect deletions for objects not present in the queue, but present in KnownObjects + knownKeys := f.knownObjects.ListKeys() + for _, knownKey := range knownKeys { + if newKeys.Has(knownKey) { // still present + continue + } + if _, inQueuedItems := lastQueuedItemForKey[knownKey]; inQueuedItems { // already added delete for these + continue + } + + deletedObj, exists, err := f.knownObjects.GetByKey(knownKey) + if err != nil { + deletedObj = nil + utilruntime.HandleError(fmt.Errorf("error during lookup, placing DeleteFinalStateUnknown marker without object: key=%q, err=%w", knownKey, err)) + } else if !exists { + deletedObj = nil + utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object: key=%q", knownKey)) + } + retErr := f.addToItems_locked(Deleted, false, DeletedFinalStateUnknown{ + Key: knownKey, + Obj: deletedObj, + }) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + // now that we have the deletes we need for items, we can add the newItems to the items queue + for _, obj := range newItems { + retErr := f.addToItems_locked(Replaced, false, obj) + if retErr != nil { + return fmt.Errorf("couldn't enqueue object: %w", retErr) + } + } + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(f.items) + } + + return nil +} + +// Resync will ensure that every object in the Store has its key in the queue. +// This should be a no-op, because that property is maintained by all operations. +func (f *RealFIFO) Resync() error { + // TODO this cannot logically be done by the FIFO, it can only be done by the indexer + f.lock.Lock() + defer f.lock.Unlock() + + if f.knownObjects == nil { + return nil + } + + keysInQueue := sets.Set[string]{} + for _, item := range f.items { + key, err := f.keyOf(item.Object) + if err != nil { + return KeyError{item, err} + } + keysInQueue.Insert(key) + } + + knownKeys := f.knownObjects.ListKeys() + for _, knownKey := range knownKeys { + // If we are doing Resync() and there is already an event queued for that object, + // we ignore the Resync for it. This is to avoid the race, in which the resync + // comes with the previous value of object (since queueing an event for the object + // doesn't trigger changing the underlying store . + if keysInQueue.Has(knownKey) { + continue + } + + knownObj, exists, err := f.knownObjects.GetByKey(knownKey) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to queue object for sync: key=%q, err=%w", knownKey, err)) + continue + } else if !exists { + utilruntime.HandleError(fmt.Errorf("key does not exist in known objects store, unable to queue object for sync: key=%q", knownKey)) + continue + } + + retErr := f.addToItems_locked(Sync, true, knownObj) + if retErr != nil { + return fmt.Errorf("couldn't queue object: %w", err) + } + } + + return nil +} + +// NewRealFIFO returns a Store which can be used to queue up items to +// process. +func NewRealFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter, transformer TransformFunc) *RealFIFO { + if knownObjects == nil { + panic("coding error: knownObjects must be provided") + } + + f := &RealFIFO{ + items: make([]Delta, 0, 10), + keyFunc: keyFunc, + knownObjects: knownObjects, + transformer: transformer, + } + f.cond.L = &f.lock + return f +} diff --git a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo_test.go new file mode 100644 index 00000000000..649ea368723 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo_test.go @@ -0,0 +1,976 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "reflect" + "runtime" + "testing" + "time" +) + +func (f *RealFIFO) getItems() []Delta { + f.lock.Lock() + defer f.lock.Unlock() + + ret := make([]Delta, len(f.items)) + copy(ret, f.items) + return ret +} + +const closedFIFOName = "FIFO WAS CLOSED" + +func popN(queue Queue, count int) []interface{} { + result := []interface{}{} + for i := 0; i < count; i++ { + queue.Pop(func(obj interface{}, isInInitialList bool) error { + result = append(result, obj) + return nil + }) + } + return result +} + +// helper function to reduce stuttering +func testRealFIFOPop(f *RealFIFO) testFifoObject { + val := Pop(f) + if val == nil { + return testFifoObject{name: closedFIFOName} + } + return val.(Deltas).Newest().Object.(testFifoObject) +} + +func emptyKnownObjects() KeyListerGetter { + return literalListerGetter( + func() []testFifoObject { + return []testFifoObject{} + }, + ) +} + +func TestRealFIFO_basic(t *testing.T) { + f := NewRealFIFO(testFifoObjectKeyFunc, emptyKnownObjects(), nil) + const amount = 500 + go func() { + for i := 0; i < amount; i++ { + f.Add(mkFifoObj(string([]rune{'a', rune(i)}), i+1)) + } + }() + go func() { + for u := uint64(0); u < amount; u++ { + f.Add(mkFifoObj(string([]rune{'b', rune(u)}), u+1)) + } + }() + + lastInt := int(0) + lastUint := uint64(0) + for i := 0; i < amount*2; i++ { + switch obj := testRealFIFOPop(f).val.(type) { + case int: + if obj <= lastInt { + t.Errorf("got %v (int) out of order, last was %v", obj, lastInt) + } + lastInt = obj + case uint64: + if obj <= lastUint { + t.Errorf("got %v (uint) out of order, last was %v", obj, lastUint) + } else { + lastUint = obj + } + default: + t.Fatalf("unexpected type %#v", obj) + } + } +} + +// TestRealFIFO_replaceWithDeleteDeltaIn tests that a `Sync` delta for an +// object `O` with ID `X` is added when .Replace is called and `O` is among the +// replacement objects even if the RealFIFO already stores in terminal position +// a delta of type `Delete` for ID `X`. Not adding the `Sync` delta causes +// SharedIndexInformers to miss `O`'s create notification, see https://github.com/kubernetes/kubernetes/issues/83810 +// for more details. +func TestRealFIFO_replaceWithDeleteDeltaIn(t *testing.T) { + oldObj := mkFifoObj("foo", 1) + newObj := mkFifoObj("foo", 2) + + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{oldObj} + }), + nil, + ) + + f.Delete(oldObj) + f.Replace([]interface{}{newObj}, "") + + actualDeltas := f.getItems() + expectedDeltas := []Delta{ + {Type: Deleted, Object: oldObj}, + {Type: Replaced, Object: newObj}, + } + if !reflect.DeepEqual(expectedDeltas, actualDeltas) { + t.Errorf("expected %#v, got %#v", expectedDeltas, actualDeltas) + } +} + +func TestRealFIFOW_ReplaceMakesDeletionsForObjectsOnlyInQueue(t *testing.T) { + obj := mkFifoObj("foo", 2) + objV2 := mkFifoObj("foo", 3) + table := []struct { + name string + operations func(f *RealFIFO) + expectedDeltas Deltas + }{ + { + name: "Added object should be deleted on Replace", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + //{ + // // ATTENTION: difference with delta_fifo_test, there is no option for emitDeltaTypeReplaced + // name: "Replaced object should have only a single Delete", + // operations: func(f *RealFIFO) { + // f.emitDeltaTypeReplaced = true + // f.Add(obj) + // f.Replace([]interface{}{obj}, "0") + // f.Replace([]interface{}{}, "0") + // }, + // expectedDeltas: Deltas{ + // {Added, obj}, + // {Replaced, obj}, + // {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + // }, + //}, + { + name: "Deleted object should have only a single Delete", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Delete(obj) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + }, + }, + { + name: "Synced objects should have a single delete", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{obj}, "0") + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Replaced, obj}, + {Replaced, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added objects should have a single delete on multiple Replaces", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Replace([]interface{}{}, "0") + f.Replace([]interface{}{}, "1") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: obj}}, + }, + }, + { + name: "Added and deleted and added object should be deleted", + operations: func(f *RealFIFO) { + f.Add(obj) + f.Delete(obj) + f.Add(objV2) + f.Replace([]interface{}{}, "0") + }, + expectedDeltas: Deltas{ + {Added, obj}, + {Deleted, obj}, + {Added, objV2}, + {Deleted, DeletedFinalStateUnknown{Key: "foo", Obj: objV2}}, + }, + }, + } + for _, tt := range table { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + // Test with a RealFIFO with a backing KnownObjects + fWithKnownObjects := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{} + }), + nil, + ) + tt.operations(fWithKnownObjects) + actualDeltasWithKnownObjects := popN(fWithKnownObjects, len(fWithKnownObjects.getItems())) + actualAsDeltas := collapseDeltas(actualDeltasWithKnownObjects) + if !reflect.DeepEqual(tt.expectedDeltas, actualAsDeltas) { + t.Errorf("expected %#v, got %#v", tt.expectedDeltas, actualAsDeltas) + } + if len(fWithKnownObjects.items) != 0 { + t.Errorf("expected no extra deltas (empty map), got %#v", fWithKnownObjects.items) + } + + // ATTENTION: difference with delta_fifo_test, there is no option without knownObjects + }) + } +} + +func collapseDeltas(ins []interface{}) Deltas { + ret := Deltas{} + for _, curr := range ins { + for _, delta := range curr.(Deltas) { + ret = append(ret, delta) + } + } + return ret +} + +// ATTENTION: difference with delta_fifo_test, there is no requeue option anymore +// func TestDeltaFIFO_requeueOnPop(t *testing.T) { + +func TestRealFIFO_addUpdate(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("foo", 12)) + f.Delete(mkFifoObj("foo", 15)) + + // ATTENTION: difference with delta_fifo_test, all items on the list. DeltaFIFO.List only showed newest, but Pop processed all. + expected1 := []Delta{ + { + Type: Added, + Object: mkFifoObj("foo", 10), + }, + { + Type: Updated, + Object: mkFifoObj("foo", 12), + }, + { + Type: Deleted, + Object: mkFifoObj("foo", 15), + }, + } + if e, a := expected1, f.getItems(); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %+v, got %+v", e, a) + } + + got := make(chan testFifoObject, 4) + done := make(chan struct{}) + go func() { + defer close(done) + for { + obj := testRealFIFOPop(f) + if obj.name == closedFIFOName { + break + } + t.Logf("got a thing %#v", obj) + t.Logf("D len: %v", len(f.items)) + got <- obj + } + }() + + first := <-got + if e, a := 10, first.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + second := <-got + if e, a := 12, second.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + third := <-got + if e, a := 15, third.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected.val) + case <-time.After(50 * time.Millisecond): + } + + if e, a := 0, len(f.getItems()); e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + f.Close() + <-done +} + +func TestRealFIFO_transformer(t *testing.T) { + mk := func(name string, rv int) testFifoObject { + return mkFifoObj(name, &rvAndXfrm{rv, 0}) + } + xfrm := TransformFunc(func(obj interface{}) (interface{}, error) { + switch v := obj.(type) { + case testFifoObject: + v.val.(*rvAndXfrm).xfrm++ + case DeletedFinalStateUnknown: + if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 { + return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj) + } + default: + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + return obj, nil + }) + + must := func(err error) { + if err != nil { + t.Fatal(err) + } + } + mustTransform := func(obj interface{}) interface{} { + ret, err := xfrm(obj) + must(err) + return ret + } + + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + xfrm, + ) + must(f.Add(mk("foo", 10))) + must(f.Add(mk("bar", 11))) + must(f.Update(mk("foo", 12))) + must(f.Delete(mk("foo", 15))) + must(f.Replace([]interface{}{}, "")) + must(f.Add(mk("bar", 16))) + must(f.Replace([]interface{}{}, "")) + + // ATTENTION: difference with delta_fifo_test, without compression, we keep all the items, including bar being deleted multiple times. + // DeltaFIFO starts by checking keys, we start by checking types and keys + expected1 := []Delta{ + {Type: Added, Object: mustTransform(mk("foo", 10))}, + {Type: Added, Object: mustTransform(mk("bar", 11))}, + {Type: Updated, Object: mustTransform(mk("foo", 12))}, + {Type: Deleted, Object: mustTransform(mk("foo", 15))}, + {Type: Deleted, Object: DeletedFinalStateUnknown{Key: "bar", Obj: mustTransform(mk("bar", 11))}}, + {Type: Added, Object: mustTransform(mk("bar", 16))}, + {Type: Deleted, Object: DeletedFinalStateUnknown{Key: "bar", Obj: mustTransform(mk("bar", 16))}}, + } + actual1 := f.getItems() + if len(expected1) != len(actual1) { + t.Fatalf("Expected %+v, got %+v", expected1, actual1) + } + for i := 0; i < len(actual1); i++ { + e := expected1[i] + a := actual1[i] + if e.Type != a.Type { + t.Errorf("%d Expected %+v, got %+v", i, e, a) + } + eKey, err := f.keyOf(e) + if err != nil { + t.Fatal(err) + } + aKey, err := f.keyOf(a) + if err != nil { + t.Fatal(err) + } + if eKey != aKey { + t.Errorf("%d Expected %+v, got %+v", i, eKey, aKey) + } + } + + for i := 0; i < len(expected1); i++ { + obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil }) + if err != nil { + t.Fatalf("got nothing on try %v?", i) + } + a := obj.(Deltas)[0] + e := expected1[i] + if !reflect.DeepEqual(e, a) { + t.Errorf("%d Expected %+v, got %+v", i, e, a) + } + } +} + +func TestRealFIFO_enqueueingNoLister(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("bar", 15)) + f.Add(mkFifoObj("qux", 17)) + f.Delete(mkFifoObj("qux", 18)) + + // RealFIFO queues everything + f.Delete(mkFifoObj("baz", 20)) + + // ATTENTION: difference with delta_fifo_test, without compression every item is queued + expectList := []int{10, 15, 17, 18, 20} + for _, expect := range expectList { + if e, a := expect, testRealFIFOPop(f).val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + } + if e, a := 0, len(f.getItems()); e != a { + t.Errorf("queue unexpectedly not empty: %v != %v\n%#v", e, a, f.getItems()) + } +} + +func TestRealFIFO_enqueueingWithLister(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Update(mkFifoObj("bar", 15)) + + // This delete does enqueue the deletion, because "baz" is in the key lister. + f.Delete(mkFifoObj("baz", 20)) + + expectList := []int{10, 15, 20} + for _, expect := range expectList { + if e, a := expect, testRealFIFOPop(f).val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + } + if e, a := 0, len(f.items); e != a { + t.Errorf("queue unexpectedly not empty: %v != %v", e, a) + } +} + +func TestRealFIFO_addReplace(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0") + got := make(chan testFifoObject, 3) + done := make(chan struct{}) + go func() { + defer close(done) + for { + obj := testRealFIFOPop(f) + if obj.name == closedFIFOName { + break + } + got <- obj + } + }() + + // ATTENTION: difference with delta_fifo_test, we get every event instead of the .Newest making us skip some for the test, but not at runtime. + curr := <-got + if e, a := 10, curr.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + curr = <-got + if e, a := 15, curr.val; e != a { + t.Errorf("Didn't get updated value (%v), got %v", e, a) + } + + select { + case unexpected := <-got: + t.Errorf("Got second value %v", unexpected.val) + case <-time.After(50 * time.Millisecond): + } + + if items := f.getItems(); len(items) > 0 { + t.Errorf("item did not get removed") + } + f.Close() + <-done +} + +func TestRealFIFO_ResyncNonExisting(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Delete(mkFifoObj("foo", 10)) + f.Resync() + + deltas := f.getItems() + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Deleted { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + +func TestRealFIFO_Resync(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Resync() + + deltas := f.getItems() + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Sync { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + +func TestRealFIFO_DeleteExistingNonPropagated(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("foo", 5)) + f.Delete(mkFifoObj("foo", 6)) + + deltas := f.getItems() + if len(deltas) != 2 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[len(deltas)-1].Type != Deleted { + t.Errorf("unexpected delta: %v", deltas[len(deltas)-1]) + } +} + +func TestRealFIFO_ReplaceMakesDeletions(t *testing.T) { + // We test with only one pre-existing object because there is no + // promise about how their deletes are ordered. + + // Try it with a pre-existing Delete + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try starting with an Add instead of a Delete + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Add(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + expectedList = []Deltas{ + {{Added, mkFifoObj("baz", 10)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try deleting and recreating the object in the queue, then delete it by a Replace call + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Delete(mkFifoObj("bar", 6)) + f.Add(mkFifoObj("bar", 100)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + expectedList = []Deltas{ + {{Deleted, mkFifoObj("bar", 6)}}, + {{Added, mkFifoObj("bar", 100)}}, + // Since "bar" has a newer object in the queue than in the state, + // it should get a tombstone key with the latest object from the queue + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } + + // Now try syncing it first to ensure the delete use the latest version + f = NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Replace([]interface{}{mkFifoObj("bar", 100), mkFifoObj("foo", 5)}, "0") + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + // ATTENTION: difference with delta_fifo_test, every event is its own Deltas with one item + // ATTENTION: difference with delta_fifo_test, deltaFifo associated by key, but realFIFO orders across all keys, so this ordering changed + expectedList = []Deltas{ + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "baz" didn't have a delete event and wasn't in the Replace list + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("bar", 100)}}, + {{Replaced, mkFifoObj("foo", 5)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 100)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for i, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("%d Expected %#v, got %#v", i, e, a) + } + } + + // Now try starting without an explicit KeyListerGetter + f = NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + f.Add(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList = []Deltas{ + {{Added, mkFifoObj("baz", 10)}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 10)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// TestRealFIFO_ReplaceMakesDeletionsReplaced is the same as the above test, but +// ensures that a Replaced DeltaType is emitted. +func TestRealFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Replaced, mkFifoObj("foo", 6)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// ATTENTION: difference with delta_fifo_test, the previous value was hardcoded as use "Replace" so I've eliminated the option to set it differently +//func TestRealFIFO_ReplaceDeltaType(t *testing.T) { + +func TestRealFIFO_UpdateResyncRace(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + f.Update(mkFifoObj("foo", 6)) + f.Resync() + + expectedList := []Deltas{ + {{Updated, mkFifoObj("foo", 6)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +func TestRealFIFO_HasSyncedCorrectOnDeletion(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + nil, + ) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + // ATTENTION: difference with delta_fifo_test, logically the deletes of known items should happen BEFORE newItems are added, so this delete happens early now + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + {{Deleted, DeletedFinalStateUnknown{Key: "baz", Obj: mkFifoObj("baz", 7)}}}, + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + if f.HasSynced() { + t.Errorf("Expected HasSynced to be false") + } + cur, initial := pop2[Deltas](f) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + if initial != true { + t.Error("Expected initial list item") + } + } + if !f.HasSynced() { + t.Errorf("Expected HasSynced to be true") + } +} + +func TestRealFIFO_detectLineJumpers(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + + f.Add(mkFifoObj("foo", 10)) + f.Add(mkFifoObj("bar", 1)) + f.Add(mkFifoObj("foo", 11)) + f.Add(mkFifoObj("foo", 13)) + f.Add(mkFifoObj("zab", 30)) + + // ATTENTION: difference with delta_fifo_test, every event is delivered in order + + if e, a := 10, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + + f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line + + if e, a := 1, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 11, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 13, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 30, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } + if e, a := 14, testRealFIFOPop(f).val; a != e { + t.Fatalf("expected %d, got %d", e, a) + } +} + +func TestRealFIFO_KeyOf(t *testing.T) { + f := RealFIFO{keyFunc: testFifoObjectKeyFunc} + + table := []struct { + obj interface{} + key string + }{ + {obj: testFifoObject{name: "A"}, key: "A"}, + {obj: DeletedFinalStateUnknown{Key: "B", Obj: nil}, key: "B"}, + {obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"}, + {obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D", Obj: nil}}}, key: "D"}, + } + + for _, item := range table { + got, err := f.keyOf(item.obj) + if err != nil { + t.Errorf("Unexpected error for %q: %v", item.obj, err) + continue + } + if e, a := item.key, got; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} + +func TestRealFIFO_HasSynced(t *testing.T) { + tests := []struct { + actions []func(f *RealFIFO) + expectedSynced bool + }{ + { + actions: []func(f *RealFIFO){}, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Add(mkFifoObj("a", 1)) }, + }, + expectedSynced: true, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{}, "0") }, + }, + expectedSynced: true, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + }, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: false, + }, + { + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: true, + }, + { + // This test case won't happen in practice since a Reflector, the only producer for delta_fifo today, always passes a complete snapshot consistent in time; + // there cannot be duplicate keys in the list or apiserver is broken. + actions: []func(f *RealFIFO){ + func(f *RealFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("a", 2)}, "0") }, + func(f *RealFIFO) { Pop(f) }, + // ATTENTION: difference with delta_fifo_test, every event is delivered, so a is listed twice and must be popped twice to remove both + func(f *RealFIFO) { Pop(f) }, + }, + expectedSynced: true, + }, + } + + for i, test := range tests { + f := NewRealFIFO( + testFifoObjectKeyFunc, + emptyKnownObjects(), + nil, + ) + + for _, action := range test.actions { + action(f) + } + if e, a := test.expectedSynced, f.HasSynced(); a != e { + t.Errorf("test case %v failed, expected: %v , got %v", i, e, a) + } + } +} + +// TestRealFIFO_PopShouldUnblockWhenClosed checks that any blocking Pop on an empty queue +// should unblock and return after Close is called. +func TestRealFIFO_PopShouldUnblockWhenClosed(t *testing.T) { + f := NewRealFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + nil, + ) + + c := make(chan struct{}) + const jobs = 10 + for i := 0; i < jobs; i++ { + go func() { + f.Pop(func(obj interface{}, isInInitialList bool) error { + return nil + }) + c <- struct{}{} + }() + } + + runtime.Gosched() + f.Close() + + for i := 0; i < jobs; i++ { + select { + case <-c: + case <-time.After(500 * time.Millisecond): + t.Fatalf("timed out waiting for Pop to return after Close") + } + } +}