From 5aacacbdf000cee2d0ec548ee4afe564f35c60bf Mon Sep 17 00:00:00 2001 From: Casey Callendrello Date: Fri, 6 Dec 2019 22:19:42 +0100 Subject: [PATCH] client-go/cache/testing: add ability to simulate watch disruption This adds ResetWatch() to the FakeControllerSource, which lets the controller simulate a re-list-and-watch. --- .../client-go/tools/cache/testing/BUILD | 1 + .../cache/testing/fake_controller_source.go | 52 ++++++++++++++----- .../testing/fake_controller_source_test.go | 43 ++++++++++++++- 3 files changed, 83 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/testing/BUILD b/staging/src/k8s.io/client-go/tools/cache/testing/BUILD index aedea862a8f..4fbfb83bedd 100644 --- a/staging/src/k8s.io/client-go/tools/cache/testing/BUILD +++ b/staging/src/k8s.io/client-go/tools/cache/testing/BUILD @@ -24,6 +24,7 @@ go_library( importpath = "k8s.io/client-go/tools/cache/testing", deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", diff --git a/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go b/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go index 24362801b86..16e66fc676a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go +++ b/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source.go @@ -18,11 +18,13 @@ package framework import ( "errors" + "fmt" "math/rand" "strconv" "sync" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -59,6 +61,7 @@ type FakeControllerSource struct { Items map[nnu]runtime.Object changes []watch.Event // one change per resourceVersion Broadcaster *watch.Broadcaster + lastRV int } type FakePVControllerSource struct { @@ -75,6 +78,16 @@ type nnu struct { uid types.UID } +// ResetWatch simulates connection problems; creates a new Broadcaster and flushes +// the change queue so that clients have to re-list and watch. +func (f *FakeControllerSource) ResetWatch() { + f.lock.Lock() + defer f.lock.Unlock() + f.Broadcaster.Shutdown() + f.Broadcaster = watch.NewBroadcaster(100, watch.WaitIfChannelFull) + f.changes = []watch.Event{} +} + // Add adds an object to the set and sends an add event to watchers. // obj's ResourceVersion is set. func (f *FakeControllerSource) Add(obj runtime.Object) { @@ -129,8 +142,8 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) { panic(err) // this is test code only } - resourceVersion := len(f.changes) + 1 - accessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + f.lastRV += 1 + accessor.SetResourceVersion(strconv.Itoa(f.lastRV)) f.changes = append(f.changes, e) key := f.key(accessor) switch e.Type { @@ -173,8 +186,7 @@ func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object, if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -194,8 +206,7 @@ func (f *FakePVControllerSource) List(options metav1.ListOptions) (runtime.Objec if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -215,8 +226,7 @@ func (f *FakePVCControllerSource) List(options metav1.ListOptions) (runtime.Obje if err != nil { return nil, err } - resourceVersion := len(f.changes) - listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion)) + listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV)) return listObj, nil } @@ -229,9 +239,27 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac if err != nil { return nil, err } - if rc < len(f.changes) { + if rc < f.lastRV { + // if the change queue was flushed... + if len(f.changes) == 0 { + return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, f.lastRV)) + } + + // get the RV of the oldest object in the change queue + oldestRV, err := meta.NewAccessor().ResourceVersion(f.changes[0].Object) + if err != nil { + panic(err) + } + oldestRC, err := strconv.Atoi(oldestRV) + if err != nil { + panic(err) + } + if rc < oldestRC { + return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, oldestRC)) + } + changes := []watch.Event{} - for _, c := range f.changes[rc:] { + for _, c := range f.changes[rc-oldestRC+1:] { // Must make a copy to allow clients to modify the // object. Otherwise, if they make a change and write // it back, they will inadvertently change the our @@ -240,7 +268,7 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()}) } return f.Broadcaster.WatchWithPrefix(changes), nil - } else if rc > len(f.changes) { + } else if rc > f.lastRV { return nil, errors.New("resource version in the future not supported by this fake") } return f.Broadcaster.Watch(), nil diff --git a/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source_test.go b/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source_test.go index e5097c7a469..817d45cd011 100644 --- a/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/testing/fake_controller_source_test.go @@ -20,7 +20,7 @@ import ( "sync" "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" ) @@ -93,3 +93,44 @@ func TestRCNumber(t *testing.T) { source.Shutdown() wg.Wait() } + +// TestResetWatch validates that the FakeController correctly mocks a watch +// falling behind and ResourceVersions aging out. +func TestResetWatch(t *testing.T) { + pod := func(name string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + source := NewFakeControllerSource() + source.Add(pod("foo")) // RV = 1 + source.Modify(pod("foo")) // RV = 2 + source.Modify(pod("foo")) // RV = 3 + + // Kill watch, delete change history + source.ResetWatch() + + // This should fail, RV=1 was lost with ResetWatch + _, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"}) + if err == nil { + t.Fatalf("Unexpected non-error") + } + + // This should succeed, RV=3 is current + w, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Modify again, ensure the watch is still working + source.Modify(pod("foo")) + go consume(t, w, []string{"4"}, wg) + source.Shutdown() + wg.Wait() +}