From a5600b6925169a7d99376d52472e5f8e8635082b Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 21 Aug 2023 12:27:41 +0200 Subject: [PATCH] storage/etcd: simplify passing the transformer --- .../src/k8s.io/apiserver/pkg/storage/etcd3/store.go | 3 ++- .../apiserver/pkg/storage/etcd3/store_test.go | 2 ++ .../k8s.io/apiserver/pkg/storage/etcd3/watcher.go | 13 ++++++------- .../apiserver/pkg/storage/etcd3/watcher_test.go | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 2660fc9464e..e110fc88bbc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -120,6 +120,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob groupResource: groupResource, newFunc: newFunc, versioner: versioner, + transformer: transformer, } if newFunc == nil { w.objectType = "" @@ -904,7 +905,7 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) if err != nil { return nil, err } - return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), s.transformer, opts) + return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts) } func (s *store) watchContext(ctx context.Context) context.Context { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index ffc9891b8da..e5f55f4e599 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -155,8 +155,10 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes originalTransformer := s.transformer.(*storagetesting.PrefixTransformer) transformer := *originalTransformer s.transformer = modifier(&transformer) + s.watcher.transformer = modifier(&transformer) return func() { s.transformer = originalTransformer + s.watcher.transformer = originalTransformer } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 2e0efffc501..0fde8655b0b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -68,12 +68,12 @@ type watcher struct { objectType string groupResource schema.GroupResource versioner storage.Versioner + transformer value.Transformer } // watchChan implements watch.Interface. type watchChan struct { watcher *watcher - transformer value.Transformer key string initialRev int64 recursive bool @@ -93,11 +93,11 @@ type watchChan struct { // If opts.Recursive is false, it watches on given key. // If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself. // pred must be non-nil. Only if opts.Predicate matches the change, it will be returned. -func (w *watcher) Watch(ctx context.Context, key string, rev int64, transformer value.Transformer, opts storage.ListOptions) (watch.Interface, error) { +func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage.ListOptions) (watch.Interface, error) { if opts.Recursive && !strings.HasSuffix(key, "/") { key += "/" } - wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, transformer, opts.Predicate) + wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate) go wc.run() // For etcd watch we don't have an easy way to answer whether the watch @@ -110,10 +110,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, transformer return wc, nil } -func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan { +func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan { wc := &watchChan{ watcher: w, - transformer: transformer, key: key, initialRev: rev, recursive: recursive, @@ -430,7 +429,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim } if !e.isDeleted { - data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key)) + data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key)) if err != nil { return nil, nil, err } @@ -445,7 +444,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim // we need the object only to compute whether it was filtered out // before). if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { - data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key)) + data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key)) if err != nil { return nil, nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 180df5d5003..e3fa2dc3533 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -115,7 +115,7 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) { func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { origCtx, store, _ := testSetup(t) ctx, cancel := context.WithCancel(origCtx) - w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything) // make resultChan and errChan blocking to ensure ordering. w.resultChan = make(chan watch.Event) w.errChan = make(chan error)