Merge pull request #87957 from liggitt/noop-deltafifo-resourceversion

Treat replaced events that didn't change resourceVersion as resync events

Kubernetes-commit: af618bd100004ecb41ddaf1ee25d8e1d0ea628f3
This commit is contained in:
Kubernetes Publisher 2020-02-10 07:07:54 -08:00
commit c416eeb74a
2 changed files with 21 additions and 6 deletions

View File

@ -21,6 +21,7 @@ import (
"sync" "sync"
"time" "time"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -486,7 +487,21 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
if err := s.indexer.Update(d.Object); err != nil { if err := s.indexer.Update(d.Object); err != nil {
return err return err
} }
isSync := d.Type == Sync
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else { } else {
if err := s.indexer.Add(d.Object); err != nil { if err := s.indexer.Add(d.Object); err != nil {

View File

@ -271,8 +271,8 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
// source simulates an apiserver object endpoint. // source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource() source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
// create the shared informer and resync every 1s // create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@ -301,8 +301,8 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
} }
// Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist // Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}}) source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3", ResourceVersion: "3"}})
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}}) source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "4"}})
// Ensure that nobody saw any changes // Ensure that nobody saw any changes
for _, listener := range listeners { for _, listener := range listeners {
@ -315,7 +315,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
listener.receivedItemNames = []string{} listener.receivedItemNames = []string{}
} }
listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") listenerNoResync.expectedItemNames = sets.NewString("pod2", "pod3")
listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3") listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
// This calls shouldSync, which deletes noResync from the list of syncingListeners // This calls shouldSync, which deletes noResync from the list of syncingListeners