From 802190f49ee63b062e59625a65bf2c78c8715edc Mon Sep 17 00:00:00 2001 From: Casey Callendrello Date: Fri, 6 Dec 2019 22:19:42 +0100 Subject: [PATCH 1/2] client-go/cache/testing: add ability to simulate watch disruption This adds ResetWatch() to the FakeControllerSource, which lets the controller simulate a re-list-and-watch. Kubernetes-commit: 5aacacbdf000cee2d0ec548ee4afe564f35c60bf --- tools/cache/testing/fake_controller_source.go | 52 ++++++++++++++----- .../testing/fake_controller_source_test.go | 43 ++++++++++++++- 2 files changed, 82 insertions(+), 13 deletions(-) diff --git a/tools/cache/testing/fake_controller_source.go b/tools/cache/testing/fake_controller_source.go index 24362801..16e66fc6 100644 --- a/tools/cache/testing/fake_controller_source.go +++ b/tools/cache/testing/fake_controller_source.go @@ -18,11 +18,13 @@ package framework import ( "errors" + "fmt" "math/rand" "strconv" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -59,6 +61,7 @@ type FakeControllerSource struct { Items map[nnu]runtime.Object changes []watch.Event // one change per resourceVersion Broadcaster *watch.Broadcaster + lastRV int } type FakePVControllerSource struct { @@ -75,6 +78,16 @@ type nnu struct { uid types.UID } +// ResetWatch simulates connection problems; creates a new Broadcaster and flushes +// the change queue so that clients have to re-list and watch. +func (f *FakeControllerSource) ResetWatch() { + f.lock.Lock() + defer f.lock.Unlock() + f.Broadcaster.Shutdown() + f.Broadcaster = watch.NewBroadcaster(100, watch.WaitIfChannelFull) + f.changes = []watch.Event{} +} + // Add adds an object to the set and sends an add event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Add(obj runtime.Object) { @@ -129,8 +142,8 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { panic(err) // this is test code only } - resourceVersion := len(f.changes) + 1 - accessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + f.lastRV += 1 + accessor.SetResourceVersion(strconv.Itoa(f.lastRV)) f.changes = append(f.changes, e) key := f.key(accessor) switch e.Type { @@ -173,8 +186,7 @@ func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object, if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -194,8 +206,7 @@ func (f *FakePVControllerSource) List(options metav1.ListOptions) (runtime.Objec if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -215,8 +226,7 @@ func (f *FakePVCControllerSource) List(options metav1.ListOptions) (runtime.Obje if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -229,9 +239,27 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac if err != nil { return nil, err } - if rc < len(f.changes) { + if rc < f.lastRV { + // if the change queue was flushed... + if len(f.changes) == 0 { + return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, f.lastRV)) + } + + // get the RV of the oldest object in the change queue + oldestRV, err := meta.NewAccessor().ResourceVersion(f.changes[0].Object) + if err != nil { + panic(err) + } + oldestRC, err := strconv.Atoi(oldestRV) + if err != nil { + panic(err) + } + if rc < oldestRC { + return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, oldestRC)) + } + changes := []watch.Event{} - for _, c := range f.changes[rc:] { + for _, c := range f.changes[rc-oldestRC+1:] { // Must make a copy to allow clients to modify the // object. Otherwise, if they make a change and write // it back, they will inadvertently change the our @@ -240,7 +268,7 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()}) } return f.Broadcaster.WatchWithPrefix(changes), nil - } else if rc > len(f.changes) { + } else if rc > f.lastRV { return nil, errors.New("resource version in the future not supported by this fake") } return f.Broadcaster.Watch(), nil diff --git a/tools/cache/testing/fake_controller_source_test.go b/tools/cache/testing/fake_controller_source_test.go index e5097c7a..817d45cd 100644 --- a/tools/cache/testing/fake_controller_source_test.go +++ b/tools/cache/testing/fake_controller_source_test.go @@ -20,7 +20,7 @@ import ( "sync" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" ) @@ -93,3 +93,44 @@ func TestRCNumber(t *testing.T) { source.Shutdown() wg.Wait() } + +// TestResetWatch validates that the FakeController correctly mocks a watch +// falling behind and ResourceVersions aging out. +func TestResetWatch(t *testing.T) { + pod := func(name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + source := NewFakeControllerSource() + source.Add(pod("foo")) // RV = 1 + source.Modify(pod("foo")) // RV = 2 + source.Modify(pod("foo")) // RV = 3 + + // Kill watch, delete change history + source.ResetWatch() + + // This should fail, RV=1 was lost with ResetWatch + _, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"}) + if err == nil { + t.Fatalf("Unexpected non-error") + } + + // This should succeed, RV=3 is current + w, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Modify again, ensure the watch is still working + source.Modify(pod("foo")) + go consume(t, w, []string{"4"}, wg) + source.Shutdown() + wg.Wait() +} From b775e00fe53e0b30886ba5faf7885be0db390676 Mon Sep 17 00:00:00 2001 From: Casey Callendrello Date: Mon, 16 Dec 2019 18:34:30 +0100 Subject: [PATCH 2/2] informers: don't treat relist same as sync Background: Before this change, DeltaFIFO emits the Sync DeltaType on Resync() and Replace(). Seperately, the SharedInformer will only pass that event on to handlers that have a ResyncInterval and are due for Resync. This can cause updates to be lost if an object changes as part of the Replace(), as it may be incorrectly discarded if the handler does not want a Resync. What this change does: Creates a new DeltaType, Replaced, which is emitted by DeltaFIFO on Replace(). For backwards compatability concerns, the old behavior of always emitting Sync is preserved unless explicity overridden. As a result, if an object changes (or is added) on Replace(), now all SharedInformer handlers will get a correct Add() or Update() notification. One additional side-effect is that handlers which do not ever want Resyncs will now see them for all objects that have not changed during the Replace. Kubernetes-commit: ca1eeb99b530a6d76b464dad545abc18d4508c49 --- tools/cache/controller.go | 7 ++- tools/cache/delta_fifo.go | 83 +++++++++++++++++++++++++---- tools/cache/delta_fifo_test.go | 72 +++++++++++++++++++++++++ tools/cache/shared_informer.go | 11 ++-- tools/cache/shared_informer_test.go | 69 +++++++++++++++++++++++- 5 files changed, 226 insertions(+), 16 deletions(-) diff --git a/tools/cache/controller.go b/tools/cache/controller.go index f1a7989c..5d582119 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -364,7 +364,10 @@ func newInformer( // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) + fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: clientState, + EmitDeltaTypeReplaced: true, + }) cfg := &Config{ Queue: fifo, @@ -377,7 +380,7 @@ func newInformer( // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { - case Sync, Added, Updated: + case Sync, Replaced, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 90ed63c3..40b6022c 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -29,7 +29,8 @@ import ( // NewDeltaFIFO returns a Queue which can be used to process changes to items. // // keyFunc is used to figure out what key an object should have. (It is -// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) +// exposed in the returned DeltaFIFO's KeyOf() method, with additional handling +// around deleted objects and queue state). // // 'knownObjects' may be supplied to modify the behavior of Delete, // Replace, and Resync. It may be nil if you do not need those @@ -56,12 +57,62 @@ import ( // and internal tests. // // Also see the comment on DeltaFIFO. +// +// Warning: This constructs a DeltaFIFO that does not differentiate between +// events caused by a call to Replace (e.g., from a relist, which may +// contain object updates), and synthetic events caused by a periodic resync +// (which just emit the existing object). See https://issue.k8s.io/86015 for details. +// +// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` +// instead to receive a `Replaced` event depending on the type. +// +// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects}) func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { + return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: keyFunc, + KnownObjects: knownObjects, + }) +} + +// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are +// optional. +type DeltaFIFOOptions struct { + + // KeyFunction is used to figure out what key an object should have. (It's + // exposed in the returned DeltaFIFO's KeyOf() method, with additional + // handling around deleted objects and queue state). + // Optional, the default is MetaNamespaceKeyFunc. + KeyFunction KeyFunc + + // KnownObjects is expected to return a list of keys that the consumer of + // this queue "knows about". It is used to decide which items are missing + // when Replace() is called; 'Deleted' deltas are produced for the missing items. + // KnownObjects may be nil if you can tolerate missing deletions on Replace(). + KnownObjects KeyListerGetter + + // EmitDeltaTypeReplaced indicates that the queue consumer + // understands the Replaced DeltaType. Before the `Replaced` event type was + // added, calls to Replace() were handled the same as Sync(). For + // backwards-compatibility purposes, this is false by default. + // When true, `Replaced` events will be sent for items passed to a Replace() call. + // When false, `Sync` events will be sent instead. + EmitDeltaTypeReplaced bool +} + +// NewDeltaFIFOWithOptions returns a Store which can be used process changes to +// items. See also the comment on DeltaFIFO. +func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { + if opts.KeyFunction == nil { + opts.KeyFunction = MetaNamespaceKeyFunc + } + f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, - keyFunc: keyFunc, - knownObjects: knownObjects, + keyFunc: opts.KeyFunction, + knownObjects: opts.KnownObjects, + + emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f @@ -134,6 +185,10 @@ type DeltaFIFO struct { // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex + + // emitDeltaTypeReplaced is whether to emit the Replaced or Sync + // DeltaType when Replace() is called (to preserve backwards compat). + emitDeltaTypeReplaced bool } var ( @@ -446,7 +501,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { } // Replace atomically does two things: (1) it adds the given objects -// using the Sync type of Delta and then (2) it does some deletions. +// using the Sync or Replace DeltaType and then (2) it does some deletions. // In particular: for every pre-existing key K that is not the key of // an object in `list` there is the effect of // `Delete(DeletedFinalStateUnknown{K, O})` where O is current object @@ -460,13 +515,19 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { defer f.lock.Unlock() keys := make(sets.String, len(list)) + // keep backwards compat for old clients + action := Sync + if f.emitDeltaTypeReplaced { + action = Replaced + } + for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) - if err := f.queueActionLocked(Sync, item); err != nil { + if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } @@ -600,10 +661,14 @@ const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" - // The other types are obvious. You'll get Sync deltas when: - // * A watch expires/errors out and a new list/watch cycle is started. - // * You've turned on periodic syncs. - // (Anything that trigger's DeltaFIFO's Replace() method.) + // Replaced is emitted when we encountered watch errors and had to do a + // relist. We don't know if the replaced object has changed. + // + // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events + // as well. Hence, Replaced is only emitted when the option + // EmitDeltaTypeReplaced is true. + Replaced DeltaType = "Replaced" + // Sync is for synthetic events during a periodic resync. Sync DeltaType = "Sync" ) diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 65e7922b..32cc96ac 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -288,6 +288,24 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { } } +func TestDeltaFIFO_Resync(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + ) + f.Resync() + + deltas := f.items["foo"] + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Sync { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, @@ -384,6 +402,60 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } } +// TestDeltaFIFO_ReplaceMakesDeletionsReplaced is the same as the above test, but +// ensures that a Replaced DeltaType is emitted. +func TestDeltaFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + EmitDeltaTypeReplaced: true, + }) + + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + {{Replaced, mkFifoObj("foo", 6)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// TestDeltaFIFO_ReplaceDeltaType checks that passing EmitDeltaTypeReplaced +// means that Replaced is correctly emitted. +func TestDeltaFIFO_ReplaceDeltaType(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + EmitDeltaTypeReplaced: true, + }) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index d355e848..f7dc1345 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -319,7 +319,10 @@ type deleteNotification struct { func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) + fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: s.indexer, + EmitDeltaTypeReplaced: true, + }) cfg := &Config{ Queue: fifo, @@ -478,19 +481,19 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { - case Sync, Added, Updated: - isSync := d.Type == Sync + case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } + isSync := d.Type == Sync s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } - s.processor.distribute(addNotification{newObj: d.Object}, isSync) + s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 22e4a90d..26a7a162 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -92,7 +92,7 @@ func (l *testListener) satisfiedExpectations() bool { l.lock.RLock() defer l.lock.RUnlock() - return len(l.receivedItemNames) == l.expectedItemNames.Len() && sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) + return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) } func TestListenerResyncPeriods(t *testing.T) { @@ -263,3 +263,70 @@ func TestSharedInformerInitializationRace(t *testing.T) { go informer.Run(stop) close(stop) } + +// TestSharedInformerWatchDisruption simulates a watch that was closed +// with updates to the store during that time. We ensure that handlers with +// resync and no resync see the expected state. +func TestSharedInformerWatchDisruption(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}}) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) + + // create the shared informer and resync every 1s + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + clock := clock.NewFakeClock(time.Now()) + informer.clock = clock + informer.processor.clock = clock + + // listener, never resync + listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod) + + listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod) + listeners := []*testListener{listenerNoResync, listenerResync} + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } + + // Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist + source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}}) + source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) + + // Ensure that nobody saw any changes + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } + + for _, listener := range listeners { + listener.receivedItemNames = []string{} + } + + listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") + listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") + + // This calls shouldSync, which deletes noResync from the list of syncingListeners + clock.Step(1 * time.Second) + + // Simulate a connection loss (or even just a too-old-watch) + source.ResetWatch() + + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } +}