From f2de1a00b8fd7a22f9d0b2c1ace69be41d304f83 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 8 May 2023 13:04:31 +0200 Subject: [PATCH] storage/etcd: skip SendInitialEvents if the request is backward compatible otherwise an error will be returned. backward compatibility is defined as RV = "" || RV = "O" and AllowWatchBookmark is set to false. in that case we rely on https://github.com/kubernetes/kubernetes/blob/267eb25e60955fe8e438c6311412e7cf7d028acb/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go#L260 --- .../k8s.io/apiserver/pkg/storage/etcd3/store.go | 6 +++++- .../apiserver/pkg/storage/etcd3/watcher_test.go | 5 +++++ .../pkg/storage/testing/watcher_tests.go | 15 +++++++++++++++ .../apiserver/pkg/storage/tests/cacher_test.go | 6 ++++++ 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 0814513a64c..aab39054477 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -865,8 +865,12 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) { } // Watch implements storage.Interface.Watch. +// TODO(#115478): In order to graduate the WatchList feature to beta, the etcd3 implementation must/should also support it. func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { - if opts.SendInitialEvents != nil { + // it is safe to skip SendInitialEvents if the request is backward compatible + // see https://github.com/kubernetes/kubernetes/blob/267eb25e60955fe8e438c6311412e7cf7d028acb/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go#L260 + compatibility := opts.Predicate.AllowWatchBookmarks == false && (opts.ResourceVersion == "" || opts.ResourceVersion == "0") + if opts.SendInitialEvents != nil && !compatibility { return nil, apierrors.NewInvalid( schema.GroupKind{Group: s.groupResource.Group, Kind: s.groupResource.Resource}, "", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 3d095c5ad0e..180df5d5003 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -99,6 +99,11 @@ func TestProgressNotify(t *testing.T) { storagetesting.RunOptionalTestProgressNotify(ctx, t, store) } +func TestSendInitialEventsBackwardCompatibility(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) +} + // ======================================================================= // Implementation-specific tests are following. // The following tests are exercising the details of the implementation diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index d42364a93bb..1a88249c77c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -34,6 +36,7 @@ import ( "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/value" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + "k8s.io/utils/pointer" ) func RunTestWatch(ctx context.Context, t *testing.T, store storage.Interface) { @@ -1167,6 +1170,18 @@ func RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx context.Context } } +// RunSendInitialEventsBackwardCompatibility test backward compatibility +// when SendInitialEvents option is set against various implementations. +// Backward compatibility is defined as RV = "" || RV = "O" and AllowWatchBookmark is set to false. +// In that case we expect a watch request to be established. +func RunSendInitialEventsBackwardCompatibility(ctx context.Context, t *testing.T, store storage.Interface) { + opts := storage.ListOptions{Predicate: storage.Everything} + opts.SendInitialEvents = pointer.Bool(true) + w, err := store.Watch(ctx, "/pods", opts) + require.NoError(t, err) + w.Stop() +} + type testWatchStruct struct { obj *example.Pod expectEvent bool diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 5c2577c5597..6bb30764028 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -498,6 +498,12 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { storagetesting.RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx, t, cacher) } +func TestSendInitialEventsBackwardCompatibility(t *testing.T) { + ctx, store, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) +} + // =================================================== // Test-setup related function are following. // ===================================================