mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Avoid unnecessary decoding in etcd3 client
This commit is contained in:
parent
0b627334df
commit
b675b2230c
@ -328,7 +328,7 @@ func (s *store) watch(ctx context.Context, key string, rv string, pred storage.S
|
||||
return nil, err
|
||||
}
|
||||
key = keyWithPrefix(s.pathPrefix, key)
|
||||
return s.watcher.Watch(ctx, key, int64(rev), recursive, storage.SimpleFilter(pred))
|
||||
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
|
||||
}
|
||||
|
||||
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
||||
|
@ -51,7 +51,7 @@ type watchChan struct {
|
||||
key string
|
||||
initialRev int64
|
||||
recursive bool
|
||||
filter storage.FilterFunc
|
||||
internalFilter storage.FilterFunc
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
incomingEventChan chan *event
|
||||
@ -73,27 +73,31 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.
|
||||
// If rev is non-zero, it will watch events happened after given revision.
|
||||
// If recursive is false, it watches on given key.
|
||||
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
|
||||
// filter must be non-nil. Only if filter returns true will the changes be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) (watch.Interface, error) {
|
||||
// pred must be non-nil. Only if pred matches the change, it will be returned.
|
||||
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
|
||||
if recursive && !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, rev, recursive, filter)
|
||||
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
|
||||
go wc.run()
|
||||
return wc, nil
|
||||
}
|
||||
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) *watchChan {
|
||||
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
|
||||
wc := &watchChan{
|
||||
watcher: w,
|
||||
key: key,
|
||||
initialRev: rev,
|
||||
recursive: recursive,
|
||||
filter: filter,
|
||||
internalFilter: storage.SimpleFilter(pred),
|
||||
incomingEventChan: make(chan *event, incomingBufSize),
|
||||
resultChan: make(chan watch.Event, outgoingBufSize),
|
||||
errChan: make(chan error, 1),
|
||||
}
|
||||
if pred.Label.Empty() && pred.Field.Empty() {
|
||||
// The filter doesn't filter out any object.
|
||||
wc.internalFilter = nil
|
||||
}
|
||||
wc.ctx, wc.cancel = context.WithCancel(ctx)
|
||||
return wc
|
||||
}
|
||||
@ -231,16 +235,20 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) filter(obj runtime.Object) bool {
|
||||
if wc.internalFilter == nil {
|
||||
return true
|
||||
}
|
||||
return wc.internalFilter(obj)
|
||||
}
|
||||
|
||||
func (wc *watchChan) acceptAll() bool {
|
||||
return wc.internalFilter == nil
|
||||
}
|
||||
|
||||
// transform transforms an event into a result for user if not filtered.
|
||||
// TODO (Optimization):
|
||||
// - Save remote round-trip.
|
||||
// Currently, DELETE and PUT event don't contain the previous value.
|
||||
// We need to do another Get() in order to get previous object and have logic upon it.
|
||||
// We could potentially do some optimizations:
|
||||
// - For PUT, we can save current and previous objects into the value.
|
||||
// - For DELETE, See https://github.com/coreos/etcd/issues/4620
|
||||
func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
||||
curObj, oldObj, err := prepareObjs(wc.ctx, e, wc.watcher.client, wc.watcher.codec, wc.watcher.versioner)
|
||||
curObj, oldObj, err := wc.prepareObjs(e)
|
||||
if err != nil {
|
||||
glog.Errorf("failed to prepare current and previous objects: %v", err)
|
||||
wc.sendError(err)
|
||||
@ -265,6 +273,13 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
||||
Object: curObj,
|
||||
}
|
||||
default:
|
||||
if wc.acceptAll() {
|
||||
res = &watch.Event{
|
||||
Type: watch.Modified,
|
||||
Object: curObj,
|
||||
}
|
||||
return res
|
||||
}
|
||||
curObjPasses := wc.filter(curObj)
|
||||
oldObjPasses := wc.filter(oldObj)
|
||||
switch {
|
||||
@ -332,19 +347,22 @@ func (wc *watchChan) sendEvent(e *event) {
|
||||
}
|
||||
}
|
||||
|
||||
func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
||||
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
|
||||
if !e.isDeleted {
|
||||
curObj, err = decodeObj(codec, versioner, e.value, e.rev)
|
||||
curObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.value, e.rev)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
if len(e.prevValue) > 0 {
|
||||
// We need to decode prevValue, only if this is deletion event or
|
||||
// the underlying filter doesn't accept all objects (otherwise we
|
||||
// know that the filter for previous object will return true and
|
||||
// we need the object only to compute whether it was filtered out
|
||||
// before).
|
||||
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
|
||||
// Note that this sends the *old* object with the etcd revision for the time at
|
||||
// which it gets deleted.
|
||||
// We assume old object is returned only in Deleted event. Users (e.g. cacher) need
|
||||
// to have larger than previous rev to tell the ordering.
|
||||
oldObj, err = decodeObj(codec, versioner, e.prevValue, e.rev)
|
||||
oldObj, err = decodeObj(wc.watcher.codec, wc.watcher.versioner, e.prevValue, e.rev)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -197,7 +197,7 @@ func TestWatchContextCancel(t *testing.T) {
|
||||
cancel()
|
||||
// When we watch with a canceled context, we should detect that it's context canceled.
|
||||
// We won't take it as error and also close the watcher.
|
||||
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.SimpleFilter(storage.Everything))
|
||||
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -216,7 +216,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||
origCtx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, cancel := context.WithCancel(origCtx)
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.SimpleFilter(storage.Everything))
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
|
||||
// make resutlChan and errChan blocking to ensure ordering.
|
||||
w.resultChan = make(chan watch.Event)
|
||||
w.errChan = make(chan error)
|
||||
|
Loading…
Reference in New Issue
Block a user