Update TestNewInformerWatcher for WatchListClient

- Switch to using the ProxyWatcher to validate the dance between
  closing the stop channel and closing the result channel.
- Use the new clientfeaturestesting.SetFeatureDuringTest to test with
  the WatchListClient enabled and disabled. These should result in
  almost the exact same output events from the informer
  (list ordering not garenteed), but with different input events
  recieved from the apiserver.

Kubernetes-commit: 28e3a728e5e6fe651d7a17839d33ce42204c0b4e
This commit is contained in:
Karl Isenberg 2024-06-03 17:31:59 -07:00 committed by Kubernetes Publisher
parent 1fc4f8ad34
commit 7c5e9038dd

View File

@ -32,7 +32,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/dump"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
fakeclientset "k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
@ -134,71 +137,151 @@ func (a byEventTypeAndName) Less(i, j int) bool {
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
}
func newTestSecret(name, key, value string) *corev1.Secret {
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
StringData: map[string]string{
key: value,
},
}
}
func TestNewInformerWatcher(t *testing.T) {
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
tt := []struct {
name string
objects []runtime.Object
events []watch.Event
name string
watchListFeatureEnabled bool
objects []runtime.Object
inputEvents []watch.Event
outputEvents []watch.Event
}{
{
name: "basic test",
name: "WatchListClient feature disabled",
watchListFeatureEnabled: false,
objects: []runtime.Object{
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
},
StringData: map[string]string{
"foo-1": "initial",
},
newTestSecret("pod-1", "foo-1", "initial"),
newTestSecret("pod-2", "foo-2", "initial"),
newTestSecret("pod-3", "foo-3", "initial"),
},
inputEvents: []watch.Event{
{
Type: watch.Added,
Object: newTestSecret("pod-4", "foo-4", "initial"),
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
},
StringData: map[string]string{
"foo-2": "initial",
},
{
Type: watch.Modified,
Object: newTestSecret("pod-2", "foo-2", "new"),
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
},
StringData: map[string]string{
"foo-3": "initial",
},
{
Type: watch.Deleted,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
},
events: []watch.Event{
outputEvents: []watch.Event{
// When WatchListClient is disabled, ListAndWatch creates fake
// ADDED events for each object listed.
{
Type: watch.Added,
Type: watch.Added,
Object: newTestSecret("pod-1", "foo-1", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-2", "foo-2", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
// Normal events follow.
{
Type: watch.Added,
Object: newTestSecret("pod-4", "foo-4", "initial"),
},
{
Type: watch.Modified,
Object: newTestSecret("pod-2", "foo-2", "new"),
},
{
Type: watch.Deleted,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
},
},
{
name: "WatchListClient feature enabled",
watchListFeatureEnabled: true,
objects: []runtime.Object{
newTestSecret("pod-1", "foo-1", "initial"),
newTestSecret("pod-2", "foo-2", "initial"),
newTestSecret("pod-3", "foo-3", "initial"),
},
inputEvents: []watch.Event{
{
Type: watch.Added,
Object: newTestSecret("pod-1", "foo-1", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-2", "foo-2", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
// ListWatch bookmark indicates that initial listing is done
{
Type: watch.Bookmark,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-4",
},
StringData: map[string]string{
"foo-4": "initial",
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
},
},
{
Type: watch.Modified,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
},
StringData: map[string]string{
"foo-2": "new",
},
},
Type: watch.Added,
Object: newTestSecret("pod-4", "foo-4", "initial"),
},
{
Type: watch.Deleted,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
},
},
Type: watch.Modified,
Object: newTestSecret("pod-2", "foo-2", "new"),
},
{
Type: watch.Deleted,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
},
outputEvents: []watch.Event{
// When WatchListClient is enabled, WatchList receives
// ADDED events from the server for each existing object.
{
Type: watch.Added,
Object: newTestSecret("pod-1", "foo-1", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-2", "foo-2", "initial"),
},
{
Type: watch.Added,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
// Bookmark event at the end of listing is not sent to the client.
// Normal events follow.
{
Type: watch.Added,
Object: newTestSecret("pod-4", "foo-4", "initial"),
},
{
Type: watch.Modified,
Object: newTestSecret("pod-2", "foo-2", "new"),
},
{
Type: watch.Deleted,
Object: newTestSecret("pod-3", "foo-3", "initial"),
},
},
},
@ -206,24 +289,28 @@ func TestNewInformerWatcher(t *testing.T) {
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
var expected []watch.Event
for _, o := range tc.objects {
expected = append(expected, watch.Event{
Type: watch.Added,
Object: o.DeepCopyObject(),
})
}
for _, e := range tc.events {
expected = append(expected, *e.DeepCopy())
}
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, tc.watchListFeatureEnabled)
fake := fakeclientset.NewSimpleClientset(tc.objects...)
fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
for _, e := range tc.events {
fakeWatch.Action(e.Type, e.Object)
}
inputCh := make(chan watch.Event)
inputWatcher := watch.NewProxyWatcher(inputCh)
// Indexer should stop the input watcher when the output watcher is stopped.
// But stop it at the end of the test, just in case.
defer inputWatcher.Stop()
inputStopCh := inputWatcher.StopChan()
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(inputWatcher, nil))
// Send events and then close the done channel
inputDoneCh := make(chan struct{})
go func() {
defer close(inputDoneCh)
for _, e := range tc.inputEvents {
select {
case inputCh <- e:
case <-inputStopCh:
return
}
}
}()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -233,46 +320,58 @@ func TestNewInformerWatcher(t *testing.T) {
return fake.CoreV1().Secrets("").Watch(context.TODO(), options)
},
}
_, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
_, _, outputWatcher, informerDoneCh := NewIndexerInformerWatcher(lw, &corev1.Secret{})
outputCh := outputWatcher.ResultChan()
timeoutCh := time.After(wait.ForeverTestTimeout)
var result []watch.Event
loop:
for {
var event watch.Event
var ok bool
select {
case event, ok = <-w.ResultChan():
case event, ok := <-outputCh:
if !ok {
t.Errorf("Failed to read event: channel is already closed!")
return
t.Errorf("Output result channel closed prematurely")
break loop
}
result = append(result, *event.DeepCopy())
case <-time.After(time.Second * 1):
// All the events are buffered -> this means we are done
// Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
if len(result) >= len(tc.outputEvents) {
break loop
}
case <-timeoutCh:
t.Error("Timed out waiting for events")
break loop
}
}
// Informers don't guarantee event order so we need to sort these arrays to compare them
sort.Sort(byEventTypeAndName(expected))
// Informers don't guarantee event order so we need to sort these arrays to compare them.
sort.Sort(byEventTypeAndName(tc.outputEvents))
sort.Sort(byEventTypeAndName(result))
if !reflect.DeepEqual(expected, result) {
t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(expected), dump.Pretty(result), cmp.Diff(expected, result))
if !reflect.DeepEqual(tc.outputEvents, result) {
t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(tc.outputEvents), dump.Pretty(result), cmp.Diff(tc.outputEvents, result))
return
}
// Fill in some data to test watch closing while there are some events to be read
for _, e := range tc.events {
fakeWatch.Action(e.Type, e.Object)
}
// Send some more events, but don't read them.
// Stop producing events when the consumer stops the watcher.
go func() {
defer close(inputCh)
for _, e := range tc.inputEvents {
select {
case inputCh <- e:
case <-inputStopCh:
return
}
}
}()
// Stop before reading all the data to make sure the informer can deal with closed channel
w.Stop()
outputWatcher.Stop()
<-done
select {
case <-informerDoneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Error("Timed out waiting for informer to cleanup")
}
})
}