storage/etcd: skip SendInitialEvents if the request is backward compatible

otherwise an error will be returned.
backward compatibility is defined as RV = "" || RV = "O" and AllowWatchBookmark is set to false.
in that case we rely on 267eb25e60/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go (L260)
This commit is contained in:
Lukasz Szaszkiewicz 2023-05-08 13:04:31 +02:00
parent f0417ac850
commit f2de1a00b8
4 changed files with 31 additions and 1 deletions

View File

@ -865,8 +865,12 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
}
// Watch implements storage.Interface.Watch.
// TODO(#115478): In order to graduate the WatchList feature to beta, the etcd3 implementation must/should also support it.
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
if opts.SendInitialEvents != nil {
// it is safe to skip SendInitialEvents if the request is backward compatible
// see https://github.com/kubernetes/kubernetes/blob/267eb25e60955fe8e438c6311412e7cf7d028acb/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go#L260
compatibility := opts.Predicate.AllowWatchBookmarks == false && (opts.ResourceVersion == "" || opts.ResourceVersion == "0")
if opts.SendInitialEvents != nil && !compatibility {
return nil, apierrors.NewInvalid(
schema.GroupKind{Group: s.groupResource.Group, Kind: s.groupResource.Resource},
"",

View File

@ -99,6 +99,11 @@ func TestProgressNotify(t *testing.T) {
storagetesting.RunOptionalTestProgressNotify(ctx, t, store)
}
func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
}
// =======================================================================
// Implementation-specific tests are following.
// The following tests are exercising the details of the implementation

View File

@ -23,6 +23,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@ -34,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/value"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/utils/pointer"
)
func RunTestWatch(ctx context.Context, t *testing.T, store storage.Interface) {
@ -1167,6 +1170,18 @@ func RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx context.Context
}
}
// RunSendInitialEventsBackwardCompatibility test backward compatibility
// when SendInitialEvents option is set against various implementations.
// Backward compatibility is defined as RV = "" || RV = "O" and AllowWatchBookmark is set to false.
// In that case we expect a watch request to be established.
func RunSendInitialEventsBackwardCompatibility(ctx context.Context, t *testing.T, store storage.Interface) {
opts := storage.ListOptions{Predicate: storage.Everything}
opts.SendInitialEvents = pointer.Bool(true)
w, err := store.Watch(ctx, "/pods", opts)
require.NoError(t, err)
w.Stop()
}
type testWatchStruct struct {
obj *example.Pod
expectEvent bool

View File

@ -498,6 +498,12 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
storagetesting.RunTestOptionalWatchBookmarksWithCorrectResourceVersion(ctx, t, cacher)
}
func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
ctx, store, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
}
// ===================================================
// Test-setup related function are following.
// ===================================================