mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 13:55:41 +00:00
Merge pull request #120081 from p0lyn0mial/upstream-storage-etcd-watcher-refactor
storage/etcd: no-op, simplify watcher construction
This commit is contained in:
commit
bcbceea117
@ -113,7 +113,22 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
|
||||
// Ensure the pathPrefix ends in "/" here to simplify key concatenation later.
|
||||
pathPrefix += "/"
|
||||
}
|
||||
result := &store{
|
||||
|
||||
w := &watcher{
|
||||
client: c,
|
||||
codec: codec,
|
||||
groupResource: groupResource,
|
||||
newFunc: newFunc,
|
||||
versioner: versioner,
|
||||
transformer: transformer,
|
||||
}
|
||||
if newFunc == nil {
|
||||
w.objectType = "<unknown>"
|
||||
} else {
|
||||
w.objectType = reflect.TypeOf(newFunc()).String()
|
||||
}
|
||||
|
||||
s := &store{
|
||||
client: c,
|
||||
codec: codec,
|
||||
versioner: versioner,
|
||||
@ -122,10 +137,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Ob
|
||||
pathPrefix: pathPrefix,
|
||||
groupResource: groupResource,
|
||||
groupResourceString: groupResource.String(),
|
||||
watcher: newWatcher(c, codec, groupResource, newFunc, versioner),
|
||||
watcher: w,
|
||||
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
||||
}
|
||||
return result
|
||||
return s
|
||||
}
|
||||
|
||||
// Versioner implements storage.Interface.Versioner.
|
||||
@ -890,7 +905,7 @@ func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), s.transformer, opts)
|
||||
return s.watcher.Watch(s.watchContext(ctx), preparedKey, int64(rev), opts)
|
||||
}
|
||||
|
||||
func (s *store) watchContext(ctx context.Context) context.Context {
|
||||
|
@ -155,8 +155,10 @@ func (s *storeWithPrefixTransformer) UpdatePrefixTransformer(modifier storagetes
|
||||
originalTransformer := s.transformer.(*storagetesting.PrefixTransformer)
|
||||
transformer := *originalTransformer
|
||||
s.transformer = modifier(&transformer)
|
||||
s.watcher.transformer = modifier(&transformer)
|
||||
return func() {
|
||||
s.transformer = originalTransformer
|
||||
s.watcher.transformer = originalTransformer
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -69,12 +68,12 @@ type watcher struct {
|
||||
objectType string
|
||||
groupResource schema.GroupResource
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
}
|
||||
|
||||
// watchChan implements watch.Interface.
|
||||
type watchChan struct {
|
||||
watcher *watcher
|
||||
transformer value.Transformer
|
||||
key string
|
||||
initialRev int64
|
||||
recursive bool
|
||||
@ -87,22 +86,6 @@ type watchChan struct {
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource schema.GroupResource, newFunc func() runtime.Object, versioner storage.Versioner) *watcher {
|
||||
res := &watcher{
|
||||
client: client,
|
||||
codec: codec,
|
||||
groupResource: groupResource,
|
||||
newFunc: newFunc,
|
||||
versioner: versioner,
|
||||
}
|
||||
if newFunc == nil {
|
||||
res.objectType = "<unknown>"
|
||||
} else {
|
||||
res.objectType = reflect.TypeOf(newFunc()).String()
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
|
||||
// If rev is zero, it will return the existing object(s) and then start watching from
|
||||
// the maximum revision+1 from returned objects.
|
||||
@ -110,11 +93,11 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, groupResource sche
|
||||
// If opts.Recursive is false, it watches on given key.
|
||||
// If opts.Recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
||||
// pred must be non-nil. Only if opts.Predicate matches the change, it will be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, transformer value.Transformer, opts storage.ListOptions) (watch.Interface, error) {
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage.ListOptions) (watch.Interface, error) {
|
||||
if opts.Recursive && !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, transformer, opts.Predicate)
|
||||
wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate)
|
||||
go wc.run()
|
||||
|
||||
// For etcd watch we don't have an easy way to answer whether the watch
|
||||
@ -127,10 +110,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, transformer
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, transformer value.Transformer, pred storage.SelectionPredicate) *watchChan {
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
|
||||
wc := &watchChan{
|
||||
watcher: w,
|
||||
transformer: transformer,
|
||||
key: key,
|
||||
initialRev: rev,
|
||||
recursive: recursive,
|
||||
@ -447,7 +429,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
|
||||
}
|
||||
|
||||
if !e.isDeleted {
|
||||
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.value, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -462,7 +444,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
|
||||
// we need the object only to compute whether it was filtered out
|
||||
// before).
|
||||
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
||||
data, _, err := wc.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
|
||||
data, _, err := wc.watcher.transformer.TransformFromStorage(wc.ctx, e.prevValue, authenticatedDataString(e.key))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
|
||||
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||
origCtx, store, _ := testSetup(t)
|
||||
ctx, cancel := context.WithCancel(origCtx)
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, newTestTransformer(), storage.Everything)
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
|
||||
// make resultChan and errChan blocking to ensure ordering.
|
||||
w.resultChan = make(chan watch.Event)
|
||||
w.errChan = make(chan error)
|
||||
|
Loading…
Reference in New Issue
Block a user