diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index f1a7989c652..5d582119392 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -364,7 +364,10 @@ func newInformer( // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) + fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: clientState, + EmitDeltaTypeReplaced: true, + }) cfg := &Config{ Queue: fifo, @@ -377,7 +380,7 @@ func newInformer( // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { - case Sync, Added, Updated: + case Sync, Replaced, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 90ed63c32db..40b6022c096 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -29,7 +29,8 @@ import ( // NewDeltaFIFO returns a Queue which can be used to process changes to items. // // keyFunc is used to figure out what key an object should have. (It is -// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) +// exposed in the returned DeltaFIFO's KeyOf() method, with additional handling +// around deleted objects and queue state). // // 'knownObjects' may be supplied to modify the behavior of Delete, // Replace, and Resync. It may be nil if you do not need those @@ -56,12 +57,62 @@ import ( // and internal tests. // // Also see the comment on DeltaFIFO. +// +// Warning: This constructs a DeltaFIFO that does not differentiate between +// events caused by a call to Replace (e.g., from a relist, which may +// contain object updates), and synthetic events caused by a periodic resync +// (which just emit the existing object). See https://issue.k8s.io/86015 for details. +// +// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` +// instead to receive a `Replaced` event depending on the type. +// +// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects}) func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { + return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: keyFunc, + KnownObjects: knownObjects, + }) +} + +// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are +// optional. +type DeltaFIFOOptions struct { + + // KeyFunction is used to figure out what key an object should have. (It's + // exposed in the returned DeltaFIFO's KeyOf() method, with additional + // handling around deleted objects and queue state). + // Optional, the default is MetaNamespaceKeyFunc. + KeyFunction KeyFunc + + // KnownObjects is expected to return a list of keys that the consumer of + // this queue "knows about". It is used to decide which items are missing + // when Replace() is called; 'Deleted' deltas are produced for the missing items. + // KnownObjects may be nil if you can tolerate missing deletions on Replace(). + KnownObjects KeyListerGetter + + // EmitDeltaTypeReplaced indicates that the queue consumer + // understands the Replaced DeltaType. Before the `Replaced` event type was + // added, calls to Replace() were handled the same as Sync(). For + // backwards-compatibility purposes, this is false by default. + // When true, `Replaced` events will be sent for items passed to a Replace() call. + // When false, `Sync` events will be sent instead. + EmitDeltaTypeReplaced bool +} + +// NewDeltaFIFOWithOptions returns a Store which can be used process changes to +// items. See also the comment on DeltaFIFO. +func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { + if opts.KeyFunction == nil { + opts.KeyFunction = MetaNamespaceKeyFunc + } + f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, - keyFunc: keyFunc, - knownObjects: knownObjects, + keyFunc: opts.KeyFunction, + knownObjects: opts.KnownObjects, + + emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f @@ -134,6 +185,10 @@ type DeltaFIFO struct { // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex + + // emitDeltaTypeReplaced is whether to emit the Replaced or Sync + // DeltaType when Replace() is called (to preserve backwards compat). + emitDeltaTypeReplaced bool } var ( @@ -446,7 +501,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { } // Replace atomically does two things: (1) it adds the given objects -// using the Sync type of Delta and then (2) it does some deletions. +// using the Sync or Replace DeltaType and then (2) it does some deletions. // In particular: for every pre-existing key K that is not the key of // an object in `list` there is the effect of // `Delete(DeletedFinalStateUnknown{K, O})` where O is current object @@ -460,13 +515,19 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { defer f.lock.Unlock() keys := make(sets.String, len(list)) + // keep backwards compat for old clients + action := Sync + if f.emitDeltaTypeReplaced { + action = Replaced + } + for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) - if err := f.queueActionLocked(Sync, item); err != nil { + if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } @@ -600,10 +661,14 @@ const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" - // The other types are obvious. You'll get Sync deltas when: - // * A watch expires/errors out and a new list/watch cycle is started. - // * You've turned on periodic syncs. - // (Anything that trigger's DeltaFIFO's Replace() method.) + // Replaced is emitted when we encountered watch errors and had to do a + // relist. We don't know if the replaced object has changed. + // + // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events + // as well. Hence, Replaced is only emitted when the option + // EmitDeltaTypeReplaced is true. + Replaced DeltaType = "Replaced" + // Sync is for synthetic events during a periodic resync. Sync DeltaType = "Sync" ) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index 65e7922b9e2..32cc96ac7a1 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -288,6 +288,24 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { } } +func TestDeltaFIFO_Resync(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + ) + f.Resync() + + deltas := f.items["foo"] + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Sync { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, @@ -384,6 +402,60 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } } +// TestDeltaFIFO_ReplaceMakesDeletionsReplaced is the same as the above test, but +// ensures that a Replaced DeltaType is emitted. +func TestDeltaFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + EmitDeltaTypeReplaced: true, + }) + + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + {{Replaced, mkFifoObj("foo", 6)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// TestDeltaFIFO_ReplaceDeltaType checks that passing EmitDeltaTypeReplaced +// means that Replaced is correctly emitted. +func TestDeltaFIFO_ReplaceDeltaType(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + EmitDeltaTypeReplaced: true, + }) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index d355e848ad9..f7dc1345de4 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -319,7 +319,10 @@ type deleteNotification struct { func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) + fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: s.indexer, + EmitDeltaTypeReplaced: true, + }) cfg := &Config{ Queue: fifo, @@ -478,19 +481,19 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { - case Sync, Added, Updated: - isSync := d.Type == Sync + case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } + isSync := d.Type == Sync s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } - s.processor.distribute(addNotification{newObj: d.Object}, isSync) + s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 22e4a90ddd7..26a7a162414 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -92,7 +92,7 @@ func (l *testListener) satisfiedExpectations() bool { l.lock.RLock() defer l.lock.RUnlock() - return len(l.receivedItemNames) == l.expectedItemNames.Len() && sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) + return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) } func TestListenerResyncPeriods(t *testing.T) { @@ -263,3 +263,70 @@ func TestSharedInformerInitializationRace(t *testing.T) { go informer.Run(stop) close(stop) } + +// TestSharedInformerWatchDisruption simulates a watch that was closed +// with updates to the store during that time. We ensure that handlers with +// resync and no resync see the expected state. +func TestSharedInformerWatchDisruption(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}}) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) + + // create the shared informer and resync every 1s + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + clock := clock.NewFakeClock(time.Now()) + informer.clock = clock + informer.processor.clock = clock + + // listener, never resync + listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod) + + listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod) + listeners := []*testListener{listenerNoResync, listenerResync} + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } + + // Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist + source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}}) + source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) + + // Ensure that nobody saw any changes + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } + + for _, listener := range listeners { + listener.receivedItemNames = []string{} + } + + listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") + listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") + + // This calls shouldSync, which deletes noResync from the list of syncingListeners + clock.Step(1 * time.Second) + + // Simulate a connection loss (or even just a too-old-watch) + source.ResetWatch() + + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } +}