storage/etcd: simplify passing the transformer

This commit is contained in:
Lukasz Szaszkiewicz 2023-08-21 12:27:41 +02:00
parent 6161752ecb
commit a5600b6925
4 changed files with 11 additions and 9 deletions

View File

@ -120,6 +120,7 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
groupResource: groupResource, groupResource: groupResource,
newFunc: newFunc, newFunc: newFunc,
versioner: versioner, versioner: versioner,
transformer: transformer,
} }
if newFunc == nil { if newFunc == nil {
w.objectType = "<unknown>" w.objectType = "<unknown>"
@ -904,7 +905,7 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), s.transformer, opts) return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts)
} }
func (s *store) watchContext(ctx context.Context) context.Context { func (s *store) watchContext(ctx context.Context) context.Context {

View File

@ -155,8 +155,10 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes
originalTransformer := s.transformer.(*storagetesting.PrefixTransformer) originalTransformer := s.transformer.(*storagetesting.PrefixTransformer)
transformer := *originalTransformer transformer := *originalTransformer
s.transformer = modifier(&transformer) s.transformer = modifier(&transformer)
s.watcher.transformer = modifier(&transformer)
return func() { return func() {
s.transformer = originalTransformer s.transformer = originalTransformer
s.watcher.transformer = originalTransformer
} }
} }

View File

@ -68,12 +68,12 @@ type watcher struct {
objectType string objectType string
groupResource schema.GroupResource groupResource schema.GroupResource
versioner storage.Versioner versioner storage.Versioner
transformer value.Transformer
} }
// watchChan implements watch.Interface. // watchChan implements watch.Interface.
type watchChan struct { type watchChan struct {
watcher *watcher watcher *watcher
transformer value.Transformer
key string key string
initialRev int64 initialRev int64
recursive bool recursive bool
@ -93,11 +93,11 @@ type watchChan struct {
// If opts.Recursive is false, it watches on given key. // If opts.Recursive is false, it watches on given key.
// If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself. // If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if opts.Predicate matches the change, it will be returned. // pred must be non-nil. Only if opts.Predicate matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, transformer value.Transformer, opts storage.ListOptions) (watch.Interface, error) { func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage.ListOptions) (watch.Interface, error) {
if opts.Recursive && !strings.HasSuffix(key, "/") { if opts.Recursive && !strings.HasSuffix(key, "/") {
key += "/" key += "/"
} }
wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, transformer, opts.Predicate) wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate)
go wc.run() go wc.run()
// For etcd watch we don't have an easy way to answer whether the watch // For etcd watch we don't have an easy way to answer whether the watch
@ -110,10 +110,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, transformer
return wc, nil return wc, nil
} }
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan { func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{ wc := &watchChan{
watcher: w, watcher: w,
transformer: transformer,
key: key, key: key,
initialRev: rev, initialRev: rev,
recursive: recursive, recursive: recursive,
@ -430,7 +429,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
} }
if !e.isDeleted { if !e.isDeleted {
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key)) data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -445,7 +444,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
// we need the object only to compute whether it was filtered out // we need the object only to compute whether it was filtered out
// before). // before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key)) data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -115,7 +115,7 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
origCtx, store, _ := testSetup(t) origCtx, store, _ := testSetup(t)
ctx, cancel := context.WithCancel(origCtx) ctx, cancel := context.WithCancel(origCtx)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything) w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
// make resultChan and errChan blocking to ensure ordering. // make resultChan and errChan blocking to ensure ordering.
w.resultChan = make(chan watch.Event) w.resultChan = make(chan watch.Event)
w.errChan = make(chan error) w.errChan = make(chan error)