Merge pull request #86015 from squeed/informer-missing-updates

informers: Don't treat relist same as sync

Kubernetes-commit: 9f09913dbf5188f0ccaecded80766a14309a6992
This commit is contained in:
Kubernetes Publisher 2020-01-23 17:40:32 -08:00
commit 7a0c6cfdb0
7 changed files with 308 additions and 29 deletions

View File

@ -364,7 +364,10 @@ func newInformer(
// This will hold incoming changes. Note how we pass clientState in as a // This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set // KeyLister, that way resync operations will result in the correct set
// of update/delete deltas. // of update/delete deltas.
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState) fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{ cfg := &Config{
Queue: fifo, Queue: fifo,
@ -377,7 +380,7 @@ func newInformer(
// from oldest to newest // from oldest to newest
for _, d := range obj.(Deltas) { for _, d := range obj.(Deltas) {
switch d.Type { switch d.Type {
case Sync, Added, Updated: case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists { if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil { if err := clientState.Update(d.Object); err != nil {
return err return err

View File

@ -29,7 +29,8 @@ import (
// NewDeltaFIFO returns a Queue which can be used to process changes to items. // NewDeltaFIFO returns a Queue which can be used to process changes to items.
// //
// keyFunc is used to figure out what key an object should have. (It is // keyFunc is used to figure out what key an object should have. (It is
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.) // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
// around deleted objects and queue state).
// //
// 'knownObjects' may be supplied to modify the behavior of Delete, // 'knownObjects' may be supplied to modify the behavior of Delete,
// Replace, and Resync. It may be nil if you do not need those // Replace, and Resync. It may be nil if you do not need those
@ -56,12 +57,62 @@ import (
// and internal tests. // and internal tests.
// //
// Also see the comment on DeltaFIFO. // Also see the comment on DeltaFIFO.
//
// Warning: This constructs a DeltaFIFO that does not differentiate between
// events caused by a call to Replace (e.g., from a relist, which may
// contain object updates), and synthetic events caused by a periodic resync
// (which just emit the existing object). See https://issue.k8s.io/86015 for details.
//
// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
// instead to receive a `Replaced` event depending on the type.
//
// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
// optional.
type DeltaFIFOOptions struct {
// KeyFunction is used to figure out what key an object should have. (It's
// exposed in the returned DeltaFIFO's KeyOf() method, with additional
// handling around deleted objects and queue state).
// Optional, the default is MetaNamespaceKeyFunc.
KeyFunction KeyFunc
// KnownObjects is expected to return a list of keys that the consumer of
// this queue "knows about". It is used to decide which items are missing
// when Replace() is called; 'Deleted' deltas are produced for the missing items.
// KnownObjects may be nil if you can tolerate missing deletions on Replace().
KnownObjects KeyListerGetter
// EmitDeltaTypeReplaced indicates that the queue consumer
// understands the Replaced DeltaType. Before the `Replaced` event type was
// added, calls to Replace() were handled the same as Sync(). For
// backwards-compatibility purposes, this is false by default.
// When true, `Replaced` events will be sent for items passed to a Replace() call.
// When false, `Sync` events will be sent instead.
EmitDeltaTypeReplaced bool
}
// NewDeltaFIFOWithOptions returns a Store which can be used process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{ f := &DeltaFIFO{
items: map[string]Deltas{}, items: map[string]Deltas{},
queue: []string{}, queue: []string{},
keyFunc: keyFunc, keyFunc: opts.KeyFunction,
knownObjects: knownObjects, knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
} }
f.cond.L = &f.lock f.cond.L = &f.lock
return f return f
@ -134,6 +185,10 @@ type DeltaFIFO struct {
// Currently, not used to gate any of CRED operations. // Currently, not used to gate any of CRED operations.
closed bool closed bool
closedLock sync.Mutex closedLock sync.Mutex
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
} }
var ( var (
@ -446,7 +501,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
} }
// Replace atomically does two things: (1) it adds the given objects // Replace atomically does two things: (1) it adds the given objects
// using the Sync type of Delta and then (2) it does some deletions. // using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of // In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of // an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object // `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
@ -460,13 +515,19 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
defer f.lock.Unlock() defer f.lock.Unlock()
keys := make(sets.String, len(list)) keys := make(sets.String, len(list))
// keep backwards compat for old clients
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
for _, item := range list { for _, item := range list {
key, err := f.KeyOf(item) key, err := f.KeyOf(item)
if err != nil { if err != nil {
return KeyError{item, err} return KeyError{item, err}
} }
keys.Insert(key) keys.Insert(key)
if err := f.queueActionLocked(Sync, item); err != nil { if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err) return fmt.Errorf("couldn't enqueue object: %v", err)
} }
} }
@ -600,10 +661,14 @@ const (
Added DeltaType = "Added" Added DeltaType = "Added"
Updated DeltaType = "Updated" Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted" Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when: // Replaced is emitted when we encountered watch errors and had to do a
// * A watch expires/errors out and a new list/watch cycle is started. // relist. We don't know if the replaced object has changed.
// * You've turned on periodic syncs. //
// (Anything that trigger's DeltaFIFO's Replace() method.) // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
// as well. Hence, Replaced is only emitted when the option
// EmitDeltaTypeReplaced is true.
Replaced DeltaType = "Replaced"
// Sync is for synthetic events during a periodic resync.
Sync DeltaType = "Sync" Sync DeltaType = "Sync"
) )

View File

@ -288,6 +288,24 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
} }
} }
func TestDeltaFIFO_Resync(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5)}
}),
)
f.Resync()
deltas := f.items["foo"]
if len(deltas) != 1 {
t.Fatalf("unexpected deltas length: %v", deltas)
}
if deltas[0].Type != Sync {
t.Errorf("unexpected delta: %v", deltas[0])
}
}
func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) { func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,
@ -384,6 +402,60 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
} }
} }
// TestDeltaFIFO_ReplaceMakesDeletionsReplaced is the same as the above test, but
// ensures that a Replaced DeltaType is emitted.
func TestDeltaFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
EmitDeltaTypeReplaced: true,
})
f.Delete(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0")
expectedList := []Deltas{
{{Deleted, mkFifoObj("baz", 10)}},
{{Replaced, mkFifoObj("foo", 6)}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
}
// TestDeltaFIFO_ReplaceDeltaType checks that passing EmitDeltaTypeReplaced
// means that Replaced is correctly emitted.
func TestDeltaFIFO_ReplaceDeltaType(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5)}
}),
EmitDeltaTypeReplaced: true,
})
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
expectedList := []Deltas{
{{Replaced, mkFifoObj("foo", 5)}},
}
for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
}
func TestDeltaFIFO_UpdateResyncRace(t *testing.T) { func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,

View File

@ -319,7 +319,10 @@ type deleteNotification struct {
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{ cfg := &Config{
Queue: fifo, Queue: fifo,
@ -478,19 +481,19 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest // from oldest to newest
for _, d := range obj.(Deltas) { for _, d := range obj.(Deltas) {
switch d.Type { switch d.Type {
case Sync, Added, Updated: case Sync, Replaced, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object) s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil { if err := s.indexer.Update(d.Object); err != nil {
return err return err
} }
isSync := d.Type == Sync
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else { } else {
if err := s.indexer.Add(d.Object); err != nil { if err := s.indexer.Add(d.Object); err != nil {
return err return err
} }
s.processor.distribute(addNotification{newObj: d.Object}, isSync) s.processor.distribute(addNotification{newObj: d.Object}, false)
} }
case Deleted: case Deleted:
if err := s.indexer.Delete(d.Object); err != nil { if err := s.indexer.Delete(d.Object); err != nil {

View File

@ -92,7 +92,7 @@ func (l *testListener) satisfiedExpectations() bool {
l.lock.RLock() l.lock.RLock()
defer l.lock.RUnlock() defer l.lock.RUnlock()
return len(l.receivedItemNames) == l.expectedItemNames.Len() && sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
} }
func TestListenerResyncPeriods(t *testing.T) { func TestListenerResyncPeriods(t *testing.T) {
@ -263,3 +263,70 @@ func TestSharedInformerInitializationRace(t *testing.T) {
go informer.Run(stop) go informer.Run(stop)
close(stop) close(stop)
} }
// TestSharedInformerWatchDisruption simulates a watch that was closed
// with updates to the store during that time. We ensure that handlers with
// resync and no resync see the expected state.
func TestSharedInformerWatchDisruption(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})
// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
clock := clock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock
// listener, never resync
listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod)
listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod)
listeners := []*testListener{listenerNoResync, listenerResync}
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop)
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
// Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}})
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})
// Ensure that nobody saw any changes
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
for _, listener := range listeners {
listener.receivedItemNames = []string{}
}
listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
// This calls shouldSync, which deletes noResync from the list of syncingListeners
clock.Step(1 * time.Second)
// Simulate a connection loss (or even just a too-old-watch)
source.ResetWatch()
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
}

View File

@ -18,11 +18,13 @@ package framework
import ( import (
"errors" "errors"
"fmt"
"math/rand" "math/rand"
"strconv" "strconv"
"sync" "sync"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -59,6 +61,7 @@ type FakeControllerSource struct {
Items map[nnu]runtime.Object Items map[nnu]runtime.Object
changes []watch.Event // one change per resourceVersion changes []watch.Event // one change per resourceVersion
Broadcaster *watch.Broadcaster Broadcaster *watch.Broadcaster
lastRV int
} }
type FakePVControllerSource struct { type FakePVControllerSource struct {
@ -75,6 +78,16 @@ type nnu struct {
uid types.UID uid types.UID
} }
// ResetWatch simulates connection problems; creates a new Broadcaster and flushes
// the change queue so that clients have to re-list and watch.
func (f *FakeControllerSource) ResetWatch() {
f.lock.Lock()
defer f.lock.Unlock()
f.Broadcaster.Shutdown()
f.Broadcaster = watch.NewBroadcaster(100, watch.WaitIfChannelFull)
f.changes = []watch.Event{}
}
// Add adds an object to the set and sends an add event to watchers. // Add adds an object to the set and sends an add event to watchers.
// obj's ResourceVersion is set. // obj's ResourceVersion is set.
func (f *FakeControllerSource) Add(obj runtime.Object) { func (f *FakeControllerSource) Add(obj runtime.Object) {
@ -129,8 +142,8 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
panic(err) // this is test code only panic(err) // this is test code only
} }
resourceVersion := len(f.changes) + 1 f.lastRV += 1
accessor.SetResourceVersion(strconv.Itoa(resourceVersion)) accessor.SetResourceVersion(strconv.Itoa(f.lastRV))
f.changes = append(f.changes, e) f.changes = append(f.changes, e)
key := f.key(accessor) key := f.key(accessor)
switch e.Type { switch e.Type {
@ -173,8 +186,7 @@ func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object,
if err != nil { if err != nil {
return nil, err return nil, err
} }
resourceVersion := len(f.changes) listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV))
listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion))
return listObj, nil return listObj, nil
} }
@ -194,8 +206,7 @@ func (f *FakePVControllerSource) List(options metav1.ListOptions) (runtime.Objec
if err != nil { if err != nil {
return nil, err return nil, err
} }
resourceVersion := len(f.changes) listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV))
listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion))
return listObj, nil return listObj, nil
} }
@ -215,8 +226,7 @@ func (f *FakePVCControllerSource) List(options metav1.ListOptions) (runtime.Obje
if err != nil { if err != nil {
return nil, err return nil, err
} }
resourceVersion := len(f.changes) listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV))
listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion))
return listObj, nil return listObj, nil
} }
@ -229,9 +239,27 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac
if err != nil { if err != nil {
return nil, err return nil, err
} }
if rc < len(f.changes) { if rc < f.lastRV {
// if the change queue was flushed...
if len(f.changes) == 0 {
return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, f.lastRV))
}
// get the RV of the oldest object in the change queue
oldestRV, err := meta.NewAccessor().ResourceVersion(f.changes[0].Object)
if err != nil {
panic(err)
}
oldestRC, err := strconv.Atoi(oldestRV)
if err != nil {
panic(err)
}
if rc < oldestRC {
return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, oldestRC))
}
changes := []watch.Event{} changes := []watch.Event{}
for _, c := range f.changes[rc:] { for _, c := range f.changes[rc-oldestRC+1:] {
// Must make a copy to allow clients to modify the // Must make a copy to allow clients to modify the
// object. Otherwise, if they make a change and write // object. Otherwise, if they make a change and write
// it back, they will inadvertently change the our // it back, they will inadvertently change the our
@ -240,7 +268,7 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac
changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()}) changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()})
} }
return f.Broadcaster.WatchWithPrefix(changes), nil return f.Broadcaster.WatchWithPrefix(changes), nil
} else if rc > len(f.changes) { } else if rc > f.lastRV {
return nil, errors.New("resource version in the future not supported by this fake") return nil, errors.New("resource version in the future not supported by this fake")
} }
return f.Broadcaster.Watch(), nil return f.Broadcaster.Watch(), nil

View File

@ -20,7 +20,7 @@ import (
"sync" "sync"
"testing" "testing"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
) )
@ -93,3 +93,44 @@ func TestRCNumber(t *testing.T) {
source.Shutdown() source.Shutdown()
wg.Wait() wg.Wait()
} }
// TestResetWatch validates that the FakeController correctly mocks a watch
// falling behind and ResourceVersions aging out.
func TestResetWatch(t *testing.T) {
pod := func(name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
}
wg := &sync.WaitGroup{}
wg.Add(1)
source := NewFakeControllerSource()
source.Add(pod("foo")) // RV = 1
source.Modify(pod("foo")) // RV = 2
source.Modify(pod("foo")) // RV = 3
// Kill watch, delete change history
source.ResetWatch()
// This should fail, RV=1 was lost with ResetWatch
_, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"})
if err == nil {
t.Fatalf("Unexpected non-error")
}
// This should succeed, RV=3 is current
w, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Modify again, ensure the watch is still working
source.Modify(pod("foo"))
go consume(t, w, []string{"4"}, wg)
source.Shutdown()
wg.Wait()
}