diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 6d2fe822467..f4b66858908 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -111,7 +111,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob pathPrefix: path.Join("/", prefix), groupResource: groupResource, groupResourceString: groupResource.String(), - watcher: newWatcher(c, codec, groupResource, newFunc, versioner, transformer), + watcher: newWatcher(c, codec, groupResource, newFunc, versioner), leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), } return result @@ -815,7 +815,7 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) return nil, err } key = path.Join(s.pathPrefix, key) - return s.watcher.Watch(ctx, key, int64(rev), opts.Recursive, opts.ProgressNotify, opts.Predicate) + return s.watcher.Watch(ctx, key, int64(rev), opts.Recursive, opts.ProgressNotify, s.transformer, opts.Predicate) } func (s *store) getState(ctx context.Context, getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index c0536fa4b57..d81eaef8bd6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -476,14 +476,6 @@ type setupOptions struct { type setupOption func(*setupOptions) -func withClient(client *clientv3.Client) setupOption { - return func(options *setupOptions) { - options.client = func(t *testing.T) *clientv3.Client { - return client - } - } -} - func withClientConfig(config *embed.Config) setupOption { return func(options *setupOptions) { options.client = func(t *testing.T) *clientv3.Client { @@ -492,12 +484,6 @@ func withClientConfig(config *embed.Config) setupOption { } } -func withCodec(codec runtime.Codec) setupOption { - return func(options *setupOptions) { - options.codec = codec - } -} - func withPrefix(prefix string) setupOption { return func(options *setupOptions) { options.prefix = prefix diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index afb79662cdb..c0b7be35c55 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -18,7 +18,6 @@ package etcd3 import ( "context" - "errors" "fmt" "os" "reflect" @@ -48,16 +47,6 @@ const ( // fatalOnDecodeError is used during testing to panic the server if watcher encounters a decoding error var fatalOnDecodeError = false -// errTestingDecode is the only error that testingDeferOnDecodeError catches during a panic -var errTestingDecode = errors.New("sentinel error only used during testing to indicate watch decoding error") - -// testingDeferOnDecodeError is used during testing to recover from a panic caused by errTestingDecode, all other values continue to panic -func testingDeferOnDecodeError() { - if r := recover(); r != nil && r != errTestingDecode { - panic(r) - } -} - func init() { // check to see if we are running in a test environment TestOnlySetFatalOnDecodeError(true) @@ -76,12 +65,12 @@ type watcher struct { objectType string groupResource schema.GroupResource versioner storage.Versioner - transformer value.Transformer } // watchChan implements watch.Interface. type watchChan struct { watcher *watcher + transformer value.Transformer key string initialRev int64 recursive bool @@ -94,14 +83,13 @@ type watchChan struct { errChan chan error } -func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher { +func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner) *watcher { res := &watcher{ client: client, codec: codec, groupResource: groupResource, newFunc: newFunc, versioner: versioner, - transformer: transformer, } if newFunc == nil { res.objectType = "" @@ -118,11 +106,11 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource sche // If recursive is false, it watches on given key. // If recursive is true, it watches any children and directories under the key, excluding the root key itself. // pred must be non-nil. Only if pred matches the change, it will be returned. -func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) { +func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) (watch.Interface, error) { if recursive && !strings.HasSuffix(key, "/") { key += "/" } - wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred) + wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, transformer, pred) go wc.run() // For etcd watch we don't have an easy way to answer whether the watch @@ -135,9 +123,10 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, p return wc, nil } -func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan { +func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan { wc := &watchChan{ watcher: w, + transformer: transformer, key: key, initialRev: rev, recursive: recursive, @@ -429,7 +418,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim } if !e.isDeleted { - data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key)) + data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key)) if err != nil { return nil, nil, err } @@ -444,7 +433,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim // we need the object only to compute whether it was filtered out // before). if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { - data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key)) + data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key)) if err != nil { return nil, nil, err } @@ -462,9 +451,6 @@ func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, re obj, err := runtime.Decode(codec, []byte(data)) if err != nil { if fatalOnDecodeError { - // catch watch decode error iff we caused it on - // purpose during a unit test - defer testingDeferOnDecodeError() // we are running in a test environment and thus an // error here is due to a coder mistake if the defer // does not catch it diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index cadd90cc3bf..c00321ce7ef 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -27,14 +27,11 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" storagetesting "k8s.io/apiserver/pkg/storage/testing" - "k8s.io/apimachinery/pkg/api/apitesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" - examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/testserver" ) @@ -62,22 +59,8 @@ func TestWatchFromNoneZero(t *testing.T) { } func TestWatchError(t *testing.T) { - // this codec fails on decodes, which will bubble up so we can verify the behavior - invalidCodec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} - ctx, invalidStore, client := testSetup(t, withCodec(invalidCodec)) - w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - _, validStore, _ := testSetup(t, withCodec(codec), withClient(client)) - if err := validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( - func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil - }), nil); err != nil { - t.Fatalf("GuaranteedUpdate failed: %v", err) - } - storagetesting.TestCheckEventType(t, watch.Error, w) + ctx, store, _ := testSetup(t) + storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store}) } func TestWatchContextCancel(t *testing.T) { @@ -88,7 +71,7 @@ func TestWatchContextCancel(t *testing.T) { func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { origCtx, store, _ := testSetup(t) ctx, cancel := context.WithCancel(origCtx) - w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything) // make resutlChan and errChan blocking to ensure ordering. w.resultChan = make(chan watch.Event) w.errChan = make(chan error) @@ -213,11 +196,3 @@ func TestProgressNotify(t *testing.T) { return nil }) } - -type testCodec struct { - runtime.Codec -} - -func (c *testCodec) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { - return nil, nil, errTestingDecode -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index 6c595589839..edef41fdcaa 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -265,3 +265,16 @@ func (rt *reproducingTransformer) createObject(ctx context.Context) error { out := &example.Pod{} return rt.store.Create(ctx, key, obj, out, 0) } + +// failingTransformer is a custom test-only transformer that always returns +// an error on transforming data from storage. +type failingTransformer struct { +} + +func (ft *failingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { + return nil, false, fmt.Errorf("failed transformation") +} + +func (ft *failingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { + return data, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index d8823db90ac..36ffb9374d1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/value" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" ) @@ -204,6 +205,39 @@ func RunTestWatchFromNoneZero(ctx context.Context, t *testing.T, store storage.I TestCheckResult(t, watch.Modified, w, out) } +func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { + // Compute the initial resource version from which we can start watching later. + list := &example.PodList{} + storageOpts := storage.ListOptions{ + ResourceVersion: "0", + Predicate: storage.Everything, + Recursive: true, + } + if err := store.GetList(ctx, "/", storageOpts, list); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if err := store.GuaranteedUpdate(ctx, "//foo", &example.Pod{}, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil + }), nil); err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Now trigger watch error by injecting failing transformer. + revertTransformer := store.UpdatePrefixTransformer( + func(previousTransformer *PrefixTransformer) value.Transformer { + return &failingTransformer{} + }) + defer revertTransformer() + + w, err := store.Watch(ctx, "//foo", storage.ListOptions{ResourceVersion: list.ResourceVersion, Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckEventType(t, watch.Error, w) +} + func RunTestWatchContextCancel(ctx context.Context, t *testing.T, store storage.Interface) { canceledCtx, cancel := context.WithCancel(ctx) cancel()