From 6161752ecbe434b8fb157dd3a30bb5c0cef7a23e Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 21 Aug 2023 12:13:12 +0200 Subject: [PATCH 1/2] storage/etcd: remove newWatcher function --- .../apiserver/pkg/storage/etcd3/store.go | 20 ++++++++++++++++--- .../apiserver/pkg/storage/etcd3/watcher.go | 17 ---------------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index c76ba284374..2660fc9464e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -113,7 +113,21 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob // Ensure the pathPrefix ends in "/" here to simplify key concatenation later. pathPrefix += "/" } - result := &store{ + + w := &watcher{ + client: c, + codec: codec, + groupResource: groupResource, + newFunc: newFunc, + versioner: versioner, + } + if newFunc == nil { + w.objectType = "" + } else { + w.objectType = reflect.TypeOf(newFunc()).String() + } + + s := &store{ client: c, codec: codec, versioner: versioner, @@ -122,10 +136,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob pathPrefix: pathPrefix, groupResource: groupResource, groupResourceString: groupResource.String(), - watcher: newWatcher(c, codec, groupResource, newFunc, versioner), + watcher: w, leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), } - return result + return s } // Versioner implements storage.Interface.Versioner. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index cd1c4d1ec2e..2e0efffc501 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "os" - "reflect" "strconv" "strings" "sync" @@ -87,22 +86,6 @@ type watchChan struct { errChan chan error } -func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner) *watcher { - res := &watcher{ - client: client, - codec: codec, - groupResource: groupResource, - newFunc: newFunc, - versioner: versioner, - } - if newFunc == nil { - res.objectType = "" - } else { - res.objectType = reflect.TypeOf(newFunc()).String() - } - return res -} - // Watch watches on a key and returns a watch.Interface that transfers relevant notifications. // If rev is zero, it will return the existing object(s) and then start watching from // the maximum revision+1 from returned objects. From a5600b6925169a7d99376d52472e5f8e8635082b Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 21 Aug 2023 12:27:41 +0200 Subject: [PATCH 2/2] storage/etcd: simplify passing the transformer --- .../src/k8s.io/apiserver/pkg/storage/etcd3/store.go | 3 ++- .../apiserver/pkg/storage/etcd3/store_test.go | 2 ++ .../k8s.io/apiserver/pkg/storage/etcd3/watcher.go | 13 ++++++------- .../apiserver/pkg/storage/etcd3/watcher_test.go | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 2660fc9464e..e110fc88bbc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -120,6 +120,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob groupResource: groupResource, newFunc: newFunc, versioner: versioner, + transformer: transformer, } if newFunc == nil { w.objectType = "" @@ -904,7 +905,7 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) if err != nil { return nil, err } - return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), s.transformer, opts) + return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts) } func (s *store) watchContext(ctx context.Context) context.Context { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index ffc9891b8da..e5f55f4e599 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -155,8 +155,10 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes originalTransformer := s.transformer.(*storagetesting.PrefixTransformer) transformer := *originalTransformer s.transformer = modifier(&transformer) + s.watcher.transformer = modifier(&transformer) return func() { s.transformer = originalTransformer + s.watcher.transformer = originalTransformer } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 2e0efffc501..0fde8655b0b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -68,12 +68,12 @@ type watcher struct { objectType string groupResource schema.GroupResource versioner storage.Versioner + transformer value.Transformer } // watchChan implements watch.Interface. type watchChan struct { watcher *watcher - transformer value.Transformer key string initialRev int64 recursive bool @@ -93,11 +93,11 @@ type watchChan struct { // If opts.Recursive is false, it watches on given key. // If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself. // pred must be non-nil. Only if opts.Predicate matches the change, it will be returned. -func (w *watcher) Watch(ctx context.Context, key string, rev int64, transformer value.Transformer, opts storage.ListOptions) (watch.Interface, error) { +func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage.ListOptions) (watch.Interface, error) { if opts.Recursive && !strings.HasSuffix(key, "/") { key += "/" } - wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, transformer, opts.Predicate) + wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate) go wc.run() // For etcd watch we don't have an easy way to answer whether the watch @@ -110,10 +110,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, transformer return wc, nil } -func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan { +func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan { wc := &watchChan{ watcher: w, - transformer: transformer, key: key, initialRev: rev, recursive: recursive, @@ -430,7 +429,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim } if !e.isDeleted { - data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key)) + data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key)) if err != nil { return nil, nil, err } @@ -445,7 +444,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim // we need the object only to compute whether it was filtered out // before). if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { - data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key)) + data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key)) if err != nil { return nil, nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 180df5d5003..e3fa2dc3533 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -115,7 +115,7 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) { func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { origCtx, store, _ := testSetup(t) ctx, cancel := context.WithCancel(origCtx) - w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything) + w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything) // make resultChan and errChan blocking to ensure ordering. w.resultChan = make(chan watch.Event) w.errChan = make(chan error)