mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
client-go/cache/testing: add ability to simulate watch disruption
This adds ResetWatch() to the FakeControllerSource, which lets the controller simulate a re-list-and-watch.
This commit is contained in:
parent
33aa665c34
commit
5aacacbdf0
@ -24,6 +24,7 @@ go_library(
|
|||||||
importpath = "k8s.io/client-go/tools/cache/testing",
|
importpath = "k8s.io/client-go/tools/cache/testing",
|
||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
@ -18,11 +18,13 @@ package framework
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -59,6 +61,7 @@ type FakeControllerSource struct {
|
|||||||
Items map[nnu]runtime.Object
|
Items map[nnu]runtime.Object
|
||||||
changes []watch.Event // one change per resourceVersion
|
changes []watch.Event // one change per resourceVersion
|
||||||
Broadcaster *watch.Broadcaster
|
Broadcaster *watch.Broadcaster
|
||||||
|
lastRV int
|
||||||
}
|
}
|
||||||
|
|
||||||
type FakePVControllerSource struct {
|
type FakePVControllerSource struct {
|
||||||
@ -75,6 +78,16 @@ type nnu struct {
|
|||||||
uid types.UID
|
uid types.UID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResetWatch simulates connection problems; creates a new Broadcaster and flushes
|
||||||
|
// the change queue so that clients have to re-list and watch.
|
||||||
|
func (f *FakeControllerSource) ResetWatch() {
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
f.Broadcaster.Shutdown()
|
||||||
|
f.Broadcaster = watch.NewBroadcaster(100, watch.WaitIfChannelFull)
|
||||||
|
f.changes = []watch.Event{}
|
||||||
|
}
|
||||||
|
|
||||||
// Add adds an object to the set and sends an add event to watchers.
|
// Add adds an object to the set and sends an add event to watchers.
|
||||||
// obj's ResourceVersion is set.
|
// obj's ResourceVersion is set.
|
||||||
func (f *FakeControllerSource) Add(obj runtime.Object) {
|
func (f *FakeControllerSource) Add(obj runtime.Object) {
|
||||||
@ -129,8 +142,8 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
|
|||||||
panic(err) // this is test code only
|
panic(err) // this is test code only
|
||||||
}
|
}
|
||||||
|
|
||||||
resourceVersion := len(f.changes) + 1
|
f.lastRV += 1
|
||||||
accessor.SetResourceVersion(strconv.Itoa(resourceVersion))
|
accessor.SetResourceVersion(strconv.Itoa(f.lastRV))
|
||||||
f.changes = append(f.changes, e)
|
f.changes = append(f.changes, e)
|
||||||
key := f.key(accessor)
|
key := f.key(accessor)
|
||||||
switch e.Type {
|
switch e.Type {
|
||||||
@ -173,8 +186,7 @@ func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resourceVersion := len(f.changes)
|
listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV))
|
||||||
listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion))
|
|
||||||
return listObj, nil
|
return listObj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,8 +206,7 @@ func (f *FakePVControllerSource) List(options metav1.ListOptions) (runtime.Objec
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resourceVersion := len(f.changes)
|
listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV))
|
||||||
listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion))
|
|
||||||
return listObj, nil
|
return listObj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,8 +226,7 @@ func (f *FakePVCControllerSource) List(options metav1.ListOptions) (runtime.Obje
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resourceVersion := len(f.changes)
|
listAccessor.SetResourceVersion(strconv.Itoa(f.lastRV))
|
||||||
listAccessor.SetResourceVersion(strconv.Itoa(resourceVersion))
|
|
||||||
return listObj, nil
|
return listObj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,9 +239,27 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if rc < len(f.changes) {
|
if rc < f.lastRV {
|
||||||
|
// if the change queue was flushed...
|
||||||
|
if len(f.changes) == 0 {
|
||||||
|
return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, f.lastRV))
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the RV of the oldest object in the change queue
|
||||||
|
oldestRV, err := meta.NewAccessor().ResourceVersion(f.changes[0].Object)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
oldestRC, err := strconv.Atoi(oldestRV)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
if rc < oldestRC {
|
||||||
|
return nil, apierrors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", rc, oldestRC))
|
||||||
|
}
|
||||||
|
|
||||||
changes := []watch.Event{}
|
changes := []watch.Event{}
|
||||||
for _, c := range f.changes[rc:] {
|
for _, c := range f.changes[rc-oldestRC+1:] {
|
||||||
// Must make a copy to allow clients to modify the
|
// Must make a copy to allow clients to modify the
|
||||||
// object. Otherwise, if they make a change and write
|
// object. Otherwise, if they make a change and write
|
||||||
// it back, they will inadvertently change the our
|
// it back, they will inadvertently change the our
|
||||||
@ -240,7 +268,7 @@ func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interfac
|
|||||||
changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()})
|
changes = append(changes, watch.Event{Type: c.Type, Object: c.Object.DeepCopyObject()})
|
||||||
}
|
}
|
||||||
return f.Broadcaster.WatchWithPrefix(changes), nil
|
return f.Broadcaster.WatchWithPrefix(changes), nil
|
||||||
} else if rc > len(f.changes) {
|
} else if rc > f.lastRV {
|
||||||
return nil, errors.New("resource version in the future not supported by this fake")
|
return nil, errors.New("resource version in the future not supported by this fake")
|
||||||
}
|
}
|
||||||
return f.Broadcaster.Watch(), nil
|
return f.Broadcaster.Watch(), nil
|
||||||
|
@ -20,7 +20,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
@ -93,3 +93,44 @@ func TestRCNumber(t *testing.T) {
|
|||||||
source.Shutdown()
|
source.Shutdown()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestResetWatch validates that the FakeController correctly mocks a watch
|
||||||
|
// falling behind and ResourceVersions aging out.
|
||||||
|
func TestResetWatch(t *testing.T) {
|
||||||
|
pod := func(name string) *v1.Pod {
|
||||||
|
return &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
source := NewFakeControllerSource()
|
||||||
|
source.Add(pod("foo")) // RV = 1
|
||||||
|
source.Modify(pod("foo")) // RV = 2
|
||||||
|
source.Modify(pod("foo")) // RV = 3
|
||||||
|
|
||||||
|
// Kill watch, delete change history
|
||||||
|
source.ResetWatch()
|
||||||
|
|
||||||
|
// This should fail, RV=1 was lost with ResetWatch
|
||||||
|
_, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Unexpected non-error")
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should succeed, RV=3 is current
|
||||||
|
w, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Modify again, ensure the watch is still working
|
||||||
|
source.Modify(pod("foo"))
|
||||||
|
go consume(t, w, []string{"4"}, wg)
|
||||||
|
source.Shutdown()
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user