informers: don't treat relist same as sync

Background:

Before this change, DeltaFIFO emits the Sync DeltaType on Resync() and
Replace(). Seperately, the SharedInformer will only pass that event
on to handlers that have a ResyncInterval and are due for Resync. This
can cause updates to be lost if an object changes as part of the Replace(),
as it may be incorrectly discarded if the handler does not want a Resync.

What this change does:

Creates a new DeltaType, Replaced, which is emitted by DeltaFIFO on
Replace(). For backwards compatability concerns, the old behavior of
always emitting Sync is preserved unless explicity overridden.

As a result, if an object changes (or is added) on Replace(), now all
SharedInformer handlers will get a correct Add() or Update()
notification.

One additional side-effect is that handlers which do not ever want
Resyncs will now see them for all objects that have not changed during
the Replace.

Kubernetes-commit: ca1eeb99b530a6d76b464dad545abc18d4508c49
This commit is contained in:
Casey Callendrello
2019-12-16 18:34:30 +01:00
committed by Kubernetes Publisher
parent 802190f49e
commit b775e00fe5
5 changed files with 226 additions and 16 deletions

View File

@@ -92,7 +92,7 @@ func (l *testListener) satisfiedExpectations() bool {
l.lock.RLock()
defer l.lock.RUnlock()
return len(l.receivedItemNames) == l.expectedItemNames.Len() && sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
}
func TestListenerResyncPeriods(t *testing.T) {
@@ -263,3 +263,70 @@ func TestSharedInformerInitializationRace(t *testing.T) {
go informer.Run(stop)
close(stop)
}
// TestSharedInformerWatchDisruption simulates a watch that was closed
// with updates to the store during that time. We ensure that handlers with
// resync and no resync see the expected state.
func TestSharedInformerWatchDisruption(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})
// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
clock := clock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock
// listener, never resync
listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod)
listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod)
listeners := []*testListener{listenerNoResync, listenerResync}
stop := make(chan struct{})
defer close(stop)
go informer.Run(stop)
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
// Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}})
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})
// Ensure that nobody saw any changes
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
for _, listener := range listeners {
listener.receivedItemNames = []string{}
}
listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
// This calls shouldSync, which deletes noResync from the list of syncingListeners
clock.Step(1 * time.Second)
// Simulate a connection loss (or even just a too-old-watch)
source.ResetWatch()
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
}