diff --git a/tools/cache/shared_informer.go b/tools/cache/shared_informer.go index 6aefd56c..826a7046 100644 --- a/tools/cache/shared_informer.go +++ b/tools/cache/shared_informer.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -486,7 +487,21 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { if err := s.indexer.Update(d.Object); err != nil { return err } - isSync := d.Type == Sync + + isSync := false + switch { + case d.Type == Sync: + // Sync events are only propagated to listeners that requested resync + isSync = true + case d.Type == Replaced: + if accessor, err := meta.Accessor(d.Object); err == nil { + if oldAccessor, err := meta.Accessor(old); err == nil { + // Replaced events that didn't change resourceVersion are treated as resync events + // and only propagated to listeners that requested resync + isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() + } + } + } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 26a7a162..5468fc74 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -271,8 +271,8 @@ func TestSharedInformerWatchDisruption(t *testing.T) { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() - source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}}) - source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}}) + source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}}) // create the shared informer and resync every 1s informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -301,8 +301,8 @@ func TestSharedInformerWatchDisruption(t *testing.T) { } // Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist - source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}}) - source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) + source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3", ResourceVersion: "3"}}) + source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "4"}}) // Ensure that nobody saw any changes for _, listener := range listeners { @@ -315,7 +315,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) { listener.receivedItemNames = []string{} } - listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") + listenerNoResync.expectedItemNames = sets.NewString("pod2", "pod3") listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") // This calls shouldSync, which deletes noResync from the list of syncingListeners