diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go index 3e5bfb1c633..e7644ddfae6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/event.go @@ -30,6 +30,17 @@ type event struct { isDeleted bool isCreated bool isProgressNotify bool + // isInitialEventsEndBookmark helps us keep track + // of whether we have sent an annotated bookmark event. + // + // when this variable is set to true, + // a special annotation will be added + // to the bookmark event. + // + // note that we decided to extend the event + // struct field to eliminate contention + // between startWatching and processEvent + isInitialEventsEndBookmark bool } // parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 2cde92abab7..275b5d14fb9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -36,7 +36,6 @@ import ( "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/storage" @@ -112,12 +111,11 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func pathPrefix += "/" } - // TODO(p0lyn0mial): pass newListFunc and resourcePrefix to the watcher w := &watcher{ client: c, codec: codec, - groupResource: groupResource, newFunc: newFunc, + groupResource: groupResource, versioner: versioner, transformer: transformer, } @@ -126,7 +124,6 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func } else { w.objectType = reflect.TypeOf(newFunc()).String() } - s := &store{ client: c, codec: codec, @@ -139,6 +136,10 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func watcher: w, leaseManager: newDefaultLeaseManager(c, leaseManagerConfig), } + + w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) { + return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType) + } return s } @@ -855,18 +856,7 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) { } // Watch implements storage.Interface.Watch. -// TODO(#115478): In order to graduate the WatchList feature to beta, the etcd3 implementation must/should also support it. func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - // it is safe to skip SendInitialEvents if the request is backward compatible - // see https://github.com/kubernetes/kubernetes/blob/267eb25e60955fe8e438c6311412e7cf7d028acb/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go#L260 - compatibility := opts.Predicate.AllowWatchBookmarks == false && (opts.ResourceVersion == "" || opts.ResourceVersion == "0") - if opts.SendInitialEvents != nil && !compatibility { - return nil, apierrors.NewInvalid( - schema.GroupKind{Group: s.groupResource.Group, Kind: s.groupResource.Resource}, - "", - field.ErrorList{field.Forbidden(field.NewPath("sendInitialEvents"), "for watch is unsupported by an etcd cluster")}, - ) - } preparedKey, err := s.prepareKey(key) if err != nil { return nil, err diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index dbd99a3e3b1..85acf44f86b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -26,20 +26,21 @@ import ( "sync" "time" + clientv3 "go.etcd.io/etcd/client/v3" grpccodes "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3/metrics" "k8s.io/apiserver/pkg/storage/value" + utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" - - clientv3 "go.etcd.io/etcd/client/v3" - "k8s.io/klog/v2" ) @@ -67,13 +68,14 @@ func TestOnlySetFatalOnDecodeError(b bool) { } type watcher struct { - client *clientv3.Client - codec runtime.Codec - newFunc func() runtime.Object - objectType string - groupResource schema.GroupResource - versioner storage.Versioner - transformer value.Transformer + client *clientv3.Client + codec runtime.Codec + newFunc func() runtime.Object + objectType string + groupResource schema.GroupResource + versioner storage.Versioner + transformer value.Transformer + getCurrentStorageRV func(context.Context) (uint64, error) } // watchChan implements watch.Interface. @@ -105,8 +107,12 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage if opts.ProgressNotify && w.newFunc == nil { return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided")) } - wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate) - go wc.run() + startWatchRV, err := w.getStartWatchResourceVersion(ctx, rev, opts) + if err != nil { + return nil, err + } + wc := w.createWatchChan(ctx, key, startWatchRV, opts.Recursive, opts.ProgressNotify, opts.Predicate) + go wc.run(isInitialEventsEndBookmarkRequired(opts), areInitialEventsRequired(rev, opts)) // For etcd watch we don't have an easy way to answer whether the watch // has already caught up. So in the initial version (given that watchcache @@ -138,6 +144,62 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re return wc } +// getStartWatchResourceVersion returns a ResourceVersion +// the watch will be started from. +// Depending on the input parameters the semantics of the returned ResourceVersion are: +// - start at Exact (return resourceVersion) +// - start at Most Recent (return an RV from etcd) +func (w *watcher) getStartWatchResourceVersion(ctx context.Context, resourceVersion int64, opts storage.ListOptions) (int64, error) { + if resourceVersion > 0 { + return resourceVersion, nil + } + if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { + return 0, nil + } + if opts.SendInitialEvents == nil || *opts.SendInitialEvents { + // note that when opts.SendInitialEvents=true + // we will be issuing a consistent LIST request + // against etcd followed by the special bookmark event + return 0, nil + } + // at this point the clients is interested + // only in getting a stream of events + // starting at the MostRecent point in time (RV) + currentStorageRV, err := w.getCurrentStorageRV(ctx) + if err != nil { + return 0, err + } + // currentStorageRV is taken from resp.Header.Revision (int64) + // and cast to uint64, so it is safe to do reverse + // at some point we should unify the interface but that + // would require changing Versioner.UpdateList + return int64(currentStorageRV), nil +} + +// isInitialEventsEndBookmarkRequired since there is no way to directly set +// opts.ProgressNotify from the API and the etcd3 impl doesn't support +// notification for external clients we simply return initialEventsEndBookmarkRequired +// to only send the bookmark event after the initial list call. +// +// see: https://github.com/kubernetes/kubernetes/issues/120348 +func isInitialEventsEndBookmarkRequired(opts storage.ListOptions) bool { + if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { + return false + } + return opts.SendInitialEvents != nil && *opts.SendInitialEvents && opts.Predicate.AllowWatchBookmarks +} + +// areInitialEventsRequired returns true if all events from the etcd should be returned. +func areInitialEventsRequired(resourceVersion int64, opts storage.ListOptions) bool { + if opts.SendInitialEvents == nil && resourceVersion == 0 { + return true // legacy case + } + if !utilfeature.DefaultFeatureGate.Enabled(features.WatchList) { + return false + } + return opts.SendInitialEvents != nil && *opts.SendInitialEvents +} + type etcdError interface { Code() grpccodes.Code Error() string @@ -163,9 +225,9 @@ func isCancelError(err error) bool { return false } -func (wc *watchChan) run() { +func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bool) { watchClosedCh := make(chan struct{}) - go wc.startWatching(watchClosedCh) + go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents) var resultChanWG sync.WaitGroup resultChanWG.Add(1) @@ -284,14 +346,44 @@ func logWatchChannelErr(err error) { // startWatching does: // - get current objects if initialRev=0; set initialRev to current rev // - watch on given key and send events to process. -func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { - if wc.initialRev == 0 { +// +// initialEventsEndBookmarkSent helps us keep track +// of whether we have sent an annotated bookmark event. +// +// it's important to note that we don't +// need to track the actual RV because +// we only send the bookmark event +// after the initial list call. +// +// when this variable is set to false, +// it means we don't have any specific +// preferences for delivering bookmark events. +func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEndBookmarkRequired, forceInitialEvents bool) { + if wc.initialRev > 0 && forceInitialEvents { + currentStorageRV, err := wc.watcher.getCurrentStorageRV(wc.ctx) + if err != nil { + wc.sendError(err) + return + } + if uint64(wc.initialRev) > currentStorageRV { + wc.sendError(storage.NewTooLargeResourceVersionError(uint64(wc.initialRev), currentStorageRV, int(wait.Jitter(1*time.Second, 3).Seconds()))) + return + } + } + if forceInitialEvents { if err := wc.sync(); err != nil { klog.Errorf("failed to sync with latest state: %v", err) wc.sendError(err) return } } + if initialEventsEndBookmarkRequired { + wc.sendEvent(func() *event { + e := progressNotifyEvent(wc.initialRev) + e.isInitialEventsEndBookmark = true + return e + }()) + } opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()} if wc.recursive { opts = append(opts, clientv3.WithPrefix()) @@ -388,6 +480,12 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) { klog.Errorf("failed to propagate object version: %v", err) return nil } + if e.isInitialEventsEndBookmark { + if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil { + wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err)) + return nil + } + } res = &watch.Event{ Type: watch.Bookmark, Object: object, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 902750fe1fd..4a7fe0888a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -24,9 +24,13 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" clientv3 "go.etcd.io/etcd/client/v3" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/features" @@ -35,6 +39,7 @@ import ( storagetesting "k8s.io/apiserver/pkg/storage/testing" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/utils/ptr" ) func TestWatch(t *testing.T) { @@ -123,6 +128,16 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) { storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) } +func TestEtcdWatchSemantics(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunWatchSemantics(ctx, t, store) +} + +func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store) +} + // ======================================================================= // Implementation-specific tests are following. // The following tests are exercising the details of the implementation @@ -145,7 +160,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - w.run() + w.run(false, true) wg.Done() }() w.errChan <- fmt.Errorf("some error") @@ -194,6 +209,51 @@ func TestWatchErrorIncorrectConfiguration(t *testing.T) { } } +func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() + origCtx, store, _ := testSetup(t) + ctx, cancel := context.WithCancel(origCtx) + defer cancel() + requestOpts := storage.ListOptions{ + SendInitialEvents: ptr.To(true), + Recursive: true, + Predicate: storage.SelectionPredicate{ + Field: fields.Everything(), + Label: labels.Everything(), + AllowWatchBookmarks: true, + }, + } + var expectedErr *apierrors.StatusError + if !errors.As(storage.NewTooLargeResourceVersionError(uint64(102), 1, 0), &expectedErr) { + t.Fatalf("Unable to convert NewTooLargeResourceVersionError to apierrors.StatusError") + } + + w, err := store.watcher.Watch(ctx, "/abc", int64(102), requestOpts) + if err != nil { + t.Fatal(err) + } + defer w.Stop() + + actualEvent := <-w.ResultChan() + if actualEvent.Type != watch.Error { + t.Fatalf("Unexpected type of the event: %v, expected: %v", actualEvent.Type, watch.Error) + } + actualErr, ok := actualEvent.Object.(*metav1.Status) + if !ok { + t.Fatalf("Expected *apierrors.StatusError, got: %#v", actualEvent.Object) + } + + if actualErr.Details.RetryAfterSeconds <= 0 { + t.Fatalf("RetryAfterSeconds must be > 0, actual value: %v", actualErr.Details.RetryAfterSeconds) + } + // rewrite the Details as it contains retry seconds + // and validate the whole struct + expectedErr.ErrStatus.Details = actualErr.Details + if diff := cmp.Diff(*actualErr, expectedErr.ErrStatus); diff != "" { + t.Fatalf("Unexpected error returned, diff: %v", diff) + } +} + func TestWatchChanSync(t *testing.T) { testCases := []struct { name string