mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #113430 from wojtek-t/refactor_storage_tests_4
Refactor WatchError test to make it generic
This commit is contained in:
commit
53afe3b674
@ -111,7 +111,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
|
|||||||
pathPrefix: path.Join("/", prefix),
|
pathPrefix: path.Join("/", prefix),
|
||||||
groupResource: groupResource,
|
groupResource: groupResource,
|
||||||
groupResourceString: groupResource.String(),
|
groupResourceString: groupResource.String(),
|
||||||
watcher: newWatcher(c, codec, groupResource, newFunc, versioner, transformer),
|
watcher: newWatcher(c, codec, groupResource, newFunc, versioner),
|
||||||
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
@ -815,7 +815,7 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
key = path.Join(s.pathPrefix, key)
|
key = path.Join(s.pathPrefix, key)
|
||||||
return s.watcher.Watch(ctx, key, int64(rev), opts.Recursive, opts.ProgressNotify, opts.Predicate)
|
return s.watcher.Watch(ctx, key, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
||||||
|
@ -476,14 +476,6 @@ type setupOptions struct {
|
|||||||
|
|
||||||
type setupOption func(*setupOptions)
|
type setupOption func(*setupOptions)
|
||||||
|
|
||||||
func withClient(client *clientv3.Client) setupOption {
|
|
||||||
return func(options *setupOptions) {
|
|
||||||
options.client = func(t *testing.T) *clientv3.Client {
|
|
||||||
return client
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func withClientConfig(config *embed.Config) setupOption {
|
func withClientConfig(config *embed.Config) setupOption {
|
||||||
return func(options *setupOptions) {
|
return func(options *setupOptions) {
|
||||||
options.client = func(t *testing.T) *clientv3.Client {
|
options.client = func(t *testing.T) *clientv3.Client {
|
||||||
@ -492,12 +484,6 @@ func withClientConfig(config *embed.Config) setupOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func withCodec(codec runtime.Codec) setupOption {
|
|
||||||
return func(options *setupOptions) {
|
|
||||||
options.codec = codec
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func withPrefix(prefix string) setupOption {
|
func withPrefix(prefix string) setupOption {
|
||||||
return func(options *setupOptions) {
|
return func(options *setupOptions) {
|
||||||
options.prefix = prefix
|
options.prefix = prefix
|
||||||
|
@ -18,7 +18,6 @@ package etcd3
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -48,16 +47,6 @@ const (
|
|||||||
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
|
// fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error
|
||||||
var fatalOnDecodeError = false
|
var fatalOnDecodeError = false
|
||||||
|
|
||||||
// errTestingDecode is the only error that testingDeferOnDecodeError catches during a panic
|
|
||||||
var errTestingDecode = errors.New("sentinel error only used during testing to indicate watch decoding error")
|
|
||||||
|
|
||||||
// testingDeferOnDecodeError is used during testing to recover from a panic caused by errTestingDecode, all other values continue to panic
|
|
||||||
func testingDeferOnDecodeError() {
|
|
||||||
if r := recover(); r != nil && r != errTestingDecode {
|
|
||||||
panic(r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
// check to see if we are running in a test environment
|
// check to see if we are running in a test environment
|
||||||
TestOnlySetFatalOnDecodeError(true)
|
TestOnlySetFatalOnDecodeError(true)
|
||||||
@ -76,12 +65,12 @@ type watcher struct {
|
|||||||
objectType string
|
objectType string
|
||||||
groupResource schema.GroupResource
|
groupResource schema.GroupResource
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
transformer value.Transformer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchChan implements watch.Interface.
|
// watchChan implements watch.Interface.
|
||||||
type watchChan struct {
|
type watchChan struct {
|
||||||
watcher *watcher
|
watcher *watcher
|
||||||
|
transformer value.Transformer
|
||||||
key string
|
key string
|
||||||
initialRev int64
|
initialRev int64
|
||||||
recursive bool
|
recursive bool
|
||||||
@ -94,14 +83,13 @@ type watchChan struct {
|
|||||||
errChan chan error
|
errChan chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
|
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner) *watcher {
|
||||||
res := &watcher{
|
res := &watcher{
|
||||||
client: client,
|
client: client,
|
||||||
codec: codec,
|
codec: codec,
|
||||||
groupResource: groupResource,
|
groupResource: groupResource,
|
||||||
newFunc: newFunc,
|
newFunc: newFunc,
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
transformer: transformer,
|
|
||||||
}
|
}
|
||||||
if newFunc == nil {
|
if newFunc == nil {
|
||||||
res.objectType = "<unknown>"
|
res.objectType = "<unknown>"
|
||||||
@ -118,11 +106,11 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource sche
|
|||||||
// If recursive is false, it watches on given key.
|
// If recursive is false, it watches on given key.
|
||||||
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
||||||
// pred must be non-nil. Only if pred matches the change, it will be returned.
|
// pred must be non-nil. Only if pred matches the change, it will be returned.
|
||||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) {
|
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||||
if recursive && !strings.HasSuffix(key, "/") {
|
if recursive && !strings.HasSuffix(key, "/") {
|
||||||
key += "/"
|
key += "/"
|
||||||
}
|
}
|
||||||
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
|
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, transformer, pred)
|
||||||
go wc.run()
|
go wc.run()
|
||||||
|
|
||||||
// For etcd watch we don't have an easy way to answer whether the watch
|
// For etcd watch we don't have an easy way to answer whether the watch
|
||||||
@ -135,9 +123,10 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p
|
|||||||
return wc, nil
|
return wc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
|
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan {
|
||||||
wc := &watchChan{
|
wc := &watchChan{
|
||||||
watcher: w,
|
watcher: w,
|
||||||
|
transformer: transformer,
|
||||||
key: key,
|
key: key,
|
||||||
initialRev: rev,
|
initialRev: rev,
|
||||||
recursive: recursive,
|
recursive: recursive,
|
||||||
@ -429,7 +418,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !e.isDeleted {
|
if !e.isDeleted {
|
||||||
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
|
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -444,7 +433,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
|
|||||||
// we need the object only to compute whether it was filtered out
|
// we need the object only to compute whether it was filtered out
|
||||||
// before).
|
// before).
|
||||||
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
||||||
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
|
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -462,9 +451,6 @@ func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, re
|
|||||||
obj, err := runtime.Decode(codec, []byte(data))
|
obj, err := runtime.Decode(codec, []byte(data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if fatalOnDecodeError {
|
if fatalOnDecodeError {
|
||||||
// catch watch decode error iff we caused it on
|
|
||||||
// purpose during a unit test
|
|
||||||
defer testingDeferOnDecodeError()
|
|
||||||
// we are running in a test environment and thus an
|
// we are running in a test environment and thus an
|
||||||
// error here is due to a coder mistake if the defer
|
// error here is due to a coder mistake if the defer
|
||||||
// does not catch it
|
// does not catch it
|
||||||
|
@ -27,14 +27,11 @@ import (
|
|||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||||
)
|
)
|
||||||
@ -62,22 +59,8 @@ func TestWatchFromNoneZero(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchError(t *testing.T) {
|
func TestWatchError(t *testing.T) {
|
||||||
// this codec fails on decodes, which will bubble up so we can verify the behavior
|
ctx, store, _ := testSetup(t)
|
||||||
invalidCodec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store})
|
||||||
ctx, invalidStore, client := testSetup(t, withCodec(invalidCodec))
|
|
||||||
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Watch failed: %v", err)
|
|
||||||
}
|
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
|
||||||
_, validStore, _ := testSetup(t, withCodec(codec), withClient(client))
|
|
||||||
if err := validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
|
||||||
func(runtime.Object) (runtime.Object, error) {
|
|
||||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
|
||||||
}), nil); err != nil {
|
|
||||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
|
||||||
}
|
|
||||||
storagetesting.TestCheckEventType(t, watch.Error, w)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatchContextCancel(t *testing.T) {
|
func TestWatchContextCancel(t *testing.T) {
|
||||||
@ -88,7 +71,7 @@ func TestWatchContextCancel(t *testing.T) {
|
|||||||
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||||
origCtx, store, _ := testSetup(t)
|
origCtx, store, _ := testSetup(t)
|
||||||
ctx, cancel := context.WithCancel(origCtx)
|
ctx, cancel := context.WithCancel(origCtx)
|
||||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
|
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything)
|
||||||
// make resutlChan and errChan blocking to ensure ordering.
|
// make resutlChan and errChan blocking to ensure ordering.
|
||||||
w.resultChan = make(chan watch.Event)
|
w.resultChan = make(chan watch.Event)
|
||||||
w.errChan = make(chan error)
|
w.errChan = make(chan error)
|
||||||
@ -213,11 +196,3 @@ func TestProgressNotify(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type testCodec struct {
|
|
||||||
runtime.Codec
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *testCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
|
|
||||||
return nil, nil, errTestingDecode
|
|
||||||
}
|
|
||||||
|
@ -265,3 +265,16 @@ func (rt *reproducingTransformer) createObject(ctx context.Context) error {
|
|||||||
out := &example.Pod{}
|
out := &example.Pod{}
|
||||||
return rt.store.Create(ctx, key, obj, out, 0)
|
return rt.store.Create(ctx, key, obj, out, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// failingTransformer is a custom test-only transformer that always returns
|
||||||
|
// an error on transforming data from storage.
|
||||||
|
type failingTransformer struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ft *failingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||||
|
return nil, false, fmt.Errorf("failed transformation")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ft *failingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -204,6 +205,39 @@ func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.I
|
|||||||
TestCheckResult(t, watch.Modified, w, out)
|
TestCheckResult(t, watch.Modified, w, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
|
||||||
|
// Compute the initial resource version from which we can start watching later.
|
||||||
|
list := &example.PodList{}
|
||||||
|
storageOpts := storage.ListOptions{
|
||||||
|
ResourceVersion: "0",
|
||||||
|
Predicate: storage.Everything,
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
if err := store.GetList(ctx, "/", storageOpts, list); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := store.GuaranteedUpdate(ctx, "//foo", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
||||||
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
|
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
||||||
|
}), nil); err != nil {
|
||||||
|
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now trigger watch error by injecting failing transformer.
|
||||||
|
revertTransformer := store.UpdatePrefixTransformer(
|
||||||
|
func(previousTransformer *PrefixTransformer) value.Transformer {
|
||||||
|
return &failingTransformer{}
|
||||||
|
})
|
||||||
|
defer revertTransformer()
|
||||||
|
|
||||||
|
w, err := store.Watch(ctx, "//foo", storage.ListOptions{ResourceVersion: list.ResourceVersion, Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
TestCheckEventType(t, watch.Error, w)
|
||||||
|
}
|
||||||
|
|
||||||
func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
canceledCtx, cancel := context.WithCancel(ctx)
|
canceledCtx, cancel := context.WithCancel(ctx)
|
||||||
cancel()
|
cancel()
|
||||||
|
Loading…
Reference in New Issue
Block a user