diff --git a/tools/cache/controller.go b/tools/cache/controller.go index f1a7989c..5d582119 100644 --- a/tools/cache/controller.go +++ b/tools/cache/controller.go @@ -364,7 +364,10 @@ func newInformer( // This will hold incoming changes. Note how we pass clientState in as a // KeyLister, that way resync operations will result in the correct set // of update/delete deltas. - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) + fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: clientState, + EmitDeltaTypeReplaced: true, + }) cfg := &Config{ Queue: fifo, @@ -377,7 +380,7 @@ func newInformer( // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { - case Sync, Added, Updated: + case Sync, Replaced, Added, Updated: if old, exists, err := clientState.Get(d.Object); err == nil && exists { if err := clientState.Update(d.Object); err != nil { return err diff --git a/tools/cache/delta_fifo.go b/tools/cache/delta_fifo.go index 90ed63c3..40b6022c 100644 --- a/tools/cache/delta_fifo.go +++ b/tools/cache/delta_fifo.go @@ -29,7 +29,8 @@ import ( // NewDeltaFIFO returns a Queue which can be used to process changes to items. // // keyFunc is used to figure out what key an object should have. (It is -// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) +// exposed in the returned DeltaFIFO's KeyOf() method, with additional handling +// around deleted objects and queue state). // // 'knownObjects' may be supplied to modify the behavior of Delete, // Replace, and Resync. It may be nil if you do not need those @@ -56,12 +57,62 @@ import ( // and internal tests. // // Also see the comment on DeltaFIFO. +// +// Warning: This constructs a DeltaFIFO that does not differentiate between +// events caused by a call to Replace (e.g., from a relist, which may +// contain object updates), and synthetic events caused by a periodic resync +// (which just emit the existing object). See https://issue.k8s.io/86015 for details. +// +// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` +// instead to receive a `Replaced` event depending on the type. +// +// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects}) func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { + return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: keyFunc, + KnownObjects: knownObjects, + }) +} + +// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are +// optional. +type DeltaFIFOOptions struct { + + // KeyFunction is used to figure out what key an object should have. (It's + // exposed in the returned DeltaFIFO's KeyOf() method, with additional + // handling around deleted objects and queue state). + // Optional, the default is MetaNamespaceKeyFunc. + KeyFunction KeyFunc + + // KnownObjects is expected to return a list of keys that the consumer of + // this queue "knows about". It is used to decide which items are missing + // when Replace() is called; 'Deleted' deltas are produced for the missing items. + // KnownObjects may be nil if you can tolerate missing deletions on Replace(). + KnownObjects KeyListerGetter + + // EmitDeltaTypeReplaced indicates that the queue consumer + // understands the Replaced DeltaType. Before the `Replaced` event type was + // added, calls to Replace() were handled the same as Sync(). For + // backwards-compatibility purposes, this is false by default. + // When true, `Replaced` events will be sent for items passed to a Replace() call. + // When false, `Sync` events will be sent instead. + EmitDeltaTypeReplaced bool +} + +// NewDeltaFIFOWithOptions returns a Store which can be used process changes to +// items. See also the comment on DeltaFIFO. +func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { + if opts.KeyFunction == nil { + opts.KeyFunction = MetaNamespaceKeyFunc + } + f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, - keyFunc: keyFunc, - knownObjects: knownObjects, + keyFunc: opts.KeyFunction, + knownObjects: opts.KnownObjects, + + emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f @@ -134,6 +185,10 @@ type DeltaFIFO struct { // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex + + // emitDeltaTypeReplaced is whether to emit the Replaced or Sync + // DeltaType when Replace() is called (to preserve backwards compat). + emitDeltaTypeReplaced bool } var ( @@ -446,7 +501,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { } // Replace atomically does two things: (1) it adds the given objects -// using the Sync type of Delta and then (2) it does some deletions. +// using the Sync or Replace DeltaType and then (2) it does some deletions. // In particular: for every pre-existing key K that is not the key of // an object in `list` there is the effect of // `Delete(DeletedFinalStateUnknown{K, O})` where O is current object @@ -460,13 +515,19 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { defer f.lock.Unlock() keys := make(sets.String, len(list)) + // keep backwards compat for old clients + action := Sync + if f.emitDeltaTypeReplaced { + action = Replaced + } + for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) - if err := f.queueActionLocked(Sync, item); err != nil { + if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } @@ -600,10 +661,14 @@ const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" - // The other types are obvious. You'll get Sync deltas when: - // * A watch expires/errors out and a new list/watch cycle is started. - // * You've turned on periodic syncs. - // (Anything that trigger's DeltaFIFO's Replace() method.) + // Replaced is emitted when we encountered watch errors and had to do a + // relist. We don't know if the replaced object has changed. + // + // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events + // as well. Hence, Replaced is only emitted when the option + // EmitDeltaTypeReplaced is true. + Replaced DeltaType = "Replaced" + // Sync is for synthetic events during a periodic resync. Sync DeltaType = "Sync" ) diff --git a/tools/cache/delta_fifo_test.go b/tools/cache/delta_fifo_test.go index 65e7922b..32cc96ac 100644 --- a/tools/cache/delta_fifo_test.go +++ b/tools/cache/delta_fifo_test.go @@ -288,6 +288,24 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) { } } +func TestDeltaFIFO_Resync(t *testing.T) { + f := NewDeltaFIFO( + testFifoObjectKeyFunc, + literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + ) + f.Resync() + + deltas := f.items["foo"] + if len(deltas) != 1 { + t.Fatalf("unexpected deltas length: %v", deltas) + } + if deltas[0].Type != Sync { + t.Errorf("unexpected delta: %v", deltas[0]) + } +} + func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, @@ -384,6 +402,60 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) { } } +// TestDeltaFIFO_ReplaceMakesDeletionsReplaced is the same as the above test, but +// ensures that a Replaced DeltaType is emitted. +func TestDeltaFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)} + }), + EmitDeltaTypeReplaced: true, + }) + + f.Delete(mkFifoObj("baz", 10)) + f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0") + + expectedList := []Deltas{ + {{Deleted, mkFifoObj("baz", 10)}}, + {{Replaced, mkFifoObj("foo", 6)}}, + // Since "bar" didn't have a delete event and wasn't in the Replace list + // it should get a tombstone key with the right Obj. + {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + +// TestDeltaFIFO_ReplaceDeltaType checks that passing EmitDeltaTypeReplaced +// means that Replaced is correctly emitted. +func TestDeltaFIFO_ReplaceDeltaType(t *testing.T) { + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: testFifoObjectKeyFunc, + KnownObjects: literalListerGetter(func() []testFifoObject { + return []testFifoObject{mkFifoObj("foo", 5)} + }), + EmitDeltaTypeReplaced: true, + }) + f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0") + + expectedList := []Deltas{ + {{Replaced, mkFifoObj("foo", 5)}}, + } + + for _, expected := range expectedList { + cur := Pop(f).(Deltas) + if e, a := expected, cur; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + } +} + func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { f := NewDeltaFIFO( testFifoObjectKeyFunc, diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index d355e848..f7dc1345 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -319,7 +319,10 @@ type deleteNotification struct { func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) + fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KnownObjects: s.indexer, + EmitDeltaTypeReplaced: true, + }) cfg := &Config{ Queue: fifo, @@ -478,19 +481,19 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { - case Sync, Added, Updated: - isSync := d.Type == Sync + case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } + isSync := d.Type == Sync s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } - s.processor.distribute(addNotification{newObj: d.Object}, isSync) + s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 22e4a90d..26a7a162 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -92,7 +92,7 @@ func (l *testListener) satisfiedExpectations() bool { l.lock.RLock() defer l.lock.RUnlock() - return len(l.receivedItemNames) == l.expectedItemNames.Len() && sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) + return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) } func TestListenerResyncPeriods(t *testing.T) { @@ -263,3 +263,70 @@ func TestSharedInformerInitializationRace(t *testing.T) { go informer.Run(stop) close(stop) } + +// TestSharedInformerWatchDisruption simulates a watch that was closed +// with updates to the store during that time. We ensure that handlers with +// resync and no resync see the expected state. +func TestSharedInformerWatchDisruption(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}}) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) + + // create the shared informer and resync every 1s + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) + + clock := clock.NewFakeClock(time.Now()) + informer.clock = clock + informer.processor.clock = clock + + // listener, never resync + listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod) + + listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod) + listeners := []*testListener{listenerNoResync, listenerResync} + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } + + // Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist + source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}}) + source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) + + // Ensure that nobody saw any changes + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } + + for _, listener := range listeners { + listener.receivedItemNames = []string{} + } + + listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") + listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") + + // This calls shouldSync, which deletes noResync from the list of syncingListeners + clock.Step(1 * time.Second) + + // Simulate a connection loss (or even just a too-old-watch) + source.ResetWatch() + + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } +} diff --git a/tools/cache/testing/fake_controller_source.go b/tools/cache/testing/fake_controller_source.go index 24362801..16e66fc6 100644 --- a/tools/cache/testing/fake_controller_source.go +++ b/tools/cache/testing/fake_controller_source.go @@ -18,11 +18,13 @@ package framework import ( "errors" + "fmt" "math/rand" "strconv" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -59,6 +61,7 @@ type FakeControllerSource struct { Items map[nnu]runtime.Object changes []watch.Event // one change per resourceVersion Broadcaster *watch.Broadcaster + lastRV int } type FakePVControllerSource struct { @@ -75,6 +78,16 @@ type nnu struct { uid types.UID } +// ResetWatch simulates connection problems; creates a new Broadcaster and flushes +// the change queue so that clients have to re-list and watch. +func (f *FakeControllerSource) ResetWatch() { + f.lock.Lock() + defer f.lock.Unlock() + f.Broadcaster.Shutdown() + f.Broadcaster = watch.NewBroadcaster(100, watch.WaitIfChannelFull) + f.changes = []watch.Event{} +} + // Add adds an object to the set and sends an add event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Add(obj runtime.Object) { @@ -129,8 +142,8 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { panic(err) // this is test code only } - resourceVersion := len(f.changes) + 1 - accessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + f.lastRV += 1 + accessor.SetResourceVersion(strconv.Itoa(f.lastRV)) f.changes = append(f.changes, e) key := f.key(accessor) switch e.Type { @@ -173,8 +186,7 @@ func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object, if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -194,8 +206,7 @@ func (f *FakePVControllerSource) List(options metav1.ListOptions) (runtime.Objec if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -215,8 +226,7 @@ func (f *FakePVCControllerSource) List(options metav1.ListOptions) (runtime.Obje if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -229,9 +239,27 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac if err != nil { return nil, err } - if rc < len(f.changes) { + if rc < f.lastRV { + // if the change queue was flushed... + if len(f.changes) == 0 { + return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, f.lastRV)) + } + + // get the RV of the oldest object in the change queue + oldestRV, err := meta.NewAccessor().ResourceVersion(f.changes[0].Object) + if err != nil { + panic(err) + } + oldestRC, err := strconv.Atoi(oldestRV) + if err != nil { + panic(err) + } + if rc < oldestRC { + return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, oldestRC)) + } + changes := []watch.Event{} - for _, c := range f.changes[rc:] { + for _, c := range f.changes[rc-oldestRC+1:] { // Must make a copy to allow clients to modify the // object. Otherwise, if they make a change and write // it back, they will inadvertently change the our @@ -240,7 +268,7 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()}) } return f.Broadcaster.WatchWithPrefix(changes), nil - } else if rc > len(f.changes) { + } else if rc > f.lastRV { return nil, errors.New("resource version in the future not supported by this fake") } return f.Broadcaster.Watch(), nil diff --git a/tools/cache/testing/fake_controller_source_test.go b/tools/cache/testing/fake_controller_source_test.go index e5097c7a..817d45cd 100644 --- a/tools/cache/testing/fake_controller_source_test.go +++ b/tools/cache/testing/fake_controller_source_test.go @@ -20,7 +20,7 @@ import ( "sync" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" ) @@ -93,3 +93,44 @@ func TestRCNumber(t *testing.T) { source.Shutdown() wg.Wait() } + +// TestResetWatch validates that the FakeController correctly mocks a watch +// falling behind and ResourceVersions aging out. +func TestResetWatch(t *testing.T) { + pod := func(name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + source := NewFakeControllerSource() + source.Add(pod("foo")) // RV = 1 + source.Modify(pod("foo")) // RV = 2 + source.Modify(pod("foo")) // RV = 3 + + // Kill watch, delete change history + source.ResetWatch() + + // This should fail, RV=1 was lost with ResetWatch + _, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"}) + if err == nil { + t.Fatalf("Unexpected non-error") + } + + // This should succeed, RV=3 is current + w, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Modify again, ensure the watch is still working + source.Modify(pod("foo")) + go consume(t, w, []string{"4"}, wg) + source.Shutdown() + wg.Wait() +}