From 1988c31fc8115bf9eec1adf2bffed3fd677d1a9f Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 28 Aug 2023 17:50:42 +0200 Subject: [PATCH] storage/etcd3: error when progressNotify option set and newFunc was provided for a registry --- .../apiserver/pkg/storage/etcd3/watcher.go | 7 +++--- .../pkg/storage/etcd3/watcher_test.go | 25 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 0fde8655b0b..4f6f8f09e3c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -18,6 +18,7 @@ package etcd3 import ( "context" + "errors" "fmt" "os" "strconv" @@ -97,6 +98,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage if opts.Recursive && !strings.HasSuffix(key, "/") { key += "/" } + if opts.ProgressNotify && w.newFunc == nil { + return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided")) + } wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate) go wc.run() @@ -334,9 +338,6 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { switch { case e.isProgressNotify: - if wc.watcher.newFunc == nil { - return nil - } object := wc.watcher.newFunc() if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil { klog.Errorf("failed to propagate object version: %v", err) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index e3fa2dc3533..35c3a3245d2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -18,11 +18,14 @@ package etcd3 import ( "context" + "errors" "fmt" + "sync" "testing" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/testserver" @@ -133,3 +136,25 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { cancel() wg.Wait() } + +// TestWatchErrorWhenNoNewFunc checks if an error +// will be returned when establishing a watch +// with progressNotify options set +// when newFunc wasn't provided +func TestWatchErrorWhenNoNewFunc(t *testing.T) { + origCtx, store, _ := testSetup(t, func(opts *setupOptions) { opts.newFunc = nil }) + ctx, cancel := context.WithCancel(origCtx) + defer cancel() + + w, err := store.watcher.Watch(ctx, "/abc", 0, storage.ListOptions{ProgressNotify: true}) + if err == nil { + t.Fatalf("expected an error but got none") + } + if w != nil { + t.Fatalf("didn't expect a watcher because progress notifications cannot be delivered for a watcher without newFunc") + } + expectedError := apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided")) + if err.Error() != expectedError.Error() { + t.Fatalf("unexpected err = %v, expected = %v", err, expectedError) + } +}