From 2ae115e5a2c9b0ce3be8b3f4b60f8acc1b7c9108 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 30 Jan 2023 11:47:16 +0100 Subject: [PATCH] Add SendInitialEvents parameter to ListOption --- .../pkg/apis/meta/internalversion/types.go | 25 +++ .../internalversion/validation/validation.go | 38 ++++- .../validation/validation_test.go | 153 ++++++++++++++++-- .../apimachinery/pkg/apis/meta/v1/types.go | 26 +++ 4 files changed, 227 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go index a49b5f2befc..00d2b8c6891 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/types.go @@ -66,6 +66,31 @@ type ListOptions struct { // it does not recognize and will return a 410 error if the token can no longer be used because // it has expired. Continue string + + // `sendInitialEvents=true` may be set together with `watch=true`. + // In that case, the watch stream will begin with synthetic events to + // produce the current state of objects in the collection. Once all such + // events have been sent, a synthetic "Bookmark" event will be sent. + // The bookmark will report the ResourceVersion (RV) corresponding to the + // set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation. + // Afterwards, the watch stream will proceed as usual, sending watch events + // corresponding to changes (subsequent to the RV) to objects watched. + // + // When `sendInitialEvents` option is set, we require `resourceVersionMatch` + // option to also be set. The semantic of the watch request is as following: + // - `resourceVersionMatch` = NotOlderThan + // is interpreted as "data at least as new as the provided `resourceVersion`" + // and the bookmark event is send when the state is synced + // to a `resourceVersion` at least as fresh as the one provided by the ListOptions. + // If `resourceVersion` is unset, this is interpreted as "consistent read" and the + // bookmark event is send when the state is synced at least to the moment + // when request started being processed. + // - `resourceVersionMatch` set to any other value or unset + // Invalid error is returned. + // + // Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward + // compatibility reasons) and to false otherwise. + SendInitialEvents *bool } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go index 8403d1a8616..2734a8f3ba6 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go @@ -17,18 +17,20 @@ limitations under the License. package validation import ( + "fmt" + "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation/field" ) // ValidateListOptions returns all validation errors found while validating the ListOptions. -func ValidateListOptions(options *internalversion.ListOptions) field.ErrorList { +func ValidateListOptions(options *internalversion.ListOptions, isWatchListFeatureEnabled bool) field.ErrorList { + if options.Watch { + return validateWatchOptions(options, isWatchListFeatureEnabled) + } allErrs := field.ErrorList{} if match := options.ResourceVersionMatch; len(match) > 0 { - if options.Watch { - allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden for watch")) - } if len(options.ResourceVersion) == 0 { allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden unless resourceVersion is provided")) } @@ -42,5 +44,33 @@ func ValidateListOptions(options *internalversion.ListOptions) field.ErrorList { allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch \"exact\" is forbidden for resourceVersion \"0\"")) } } + if options.SendInitialEvents != nil { + allErrs = append(allErrs, field.Forbidden(field.NewPath("sendInitialEvents"), "sendInitialEvents is forbidden for list")) + } + return allErrs +} + +func validateWatchOptions(options *internalversion.ListOptions, isWatchListFeatureEnabled bool) field.ErrorList { + allErrs := field.ErrorList{} + match := options.ResourceVersionMatch + if options.SendInitialEvents != nil { + if match != metav1.ResourceVersionMatchNotOlderThan { + allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), fmt.Sprintf("sendInitialEvents requires setting resourceVersionMatch to %s", metav1.ResourceVersionMatchNotOlderThan))) + } + if !isWatchListFeatureEnabled { + allErrs = append(allErrs, field.Forbidden(field.NewPath("sendInitialEvents"), "sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled")) + } + } + if len(match) > 0 { + if options.SendInitialEvents == nil { + allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden for watch unless sendInitialEvents is provided")) + } + if match != metav1.ResourceVersionMatchNotOlderThan { + allErrs = append(allErrs, field.NotSupported(field.NewPath("resourceVersionMatch"), match, []string{string(metav1.ResourceVersionMatchNotOlderThan)})) + } + if len(options.Continue) > 0 { + allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden when continue is provided")) + } + } return allErrs } diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation_test.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation_test.go index 877bf34c7bb..d6da2298882 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation_test.go @@ -23,10 +23,15 @@ import ( ) func TestValidateListOptions(t *testing.T) { + boolPtrFn := func(b bool) *bool { + return &b + } + cases := []struct { - name string - opts internalversion.ListOptions - expectError string + name string + opts internalversion.ListOptions + watchListFeatureEnabled bool + expectErrors []string }{ { name: "valid-default", @@ -45,7 +50,7 @@ func TestValidateListOptions(t *testing.T) { ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchExact, }, - expectError: "resourceVersionMatch: Forbidden: resourceVersionMatch \"exact\" is forbidden for resourceVersion \"0\"", + expectErrors: []string{"resourceVersionMatch: Forbidden: resourceVersionMatch \"exact\" is forbidden for resourceVersion \"0\""}, }, { name: "valid-resourceversionmatch-notolderthan", @@ -60,18 +65,144 @@ func TestValidateListOptions(t *testing.T) { ResourceVersion: "0", ResourceVersionMatch: "foo", }, - expectError: "resourceVersionMatch: Unsupported value: \"foo\": supported values: \"Exact\", \"NotOlderThan\", \"\"", + expectErrors: []string{"resourceVersionMatch: Unsupported value: \"foo\": supported values: \"Exact\", \"NotOlderThan\", \"\""}, + }, + { + name: "list-sendInitialEvents-forbidden", + opts: internalversion.ListOptions{ + SendInitialEvents: boolPtrFn(true), + }, + expectErrors: []string{"sendInitialEvents: Forbidden: sendInitialEvents is forbidden for list"}, + }, + { + name: "valid-watch-default", + opts: internalversion.ListOptions{ + Watch: true, + }, + }, + { + name: "valid-watch-sendInitialEvents-on", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(true), + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + AllowWatchBookmarks: true, + }, + watchListFeatureEnabled: true, + }, + { + name: "valid-watch-sendInitialEvents-off", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(false), + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + AllowWatchBookmarks: true, + }, + watchListFeatureEnabled: true, + }, + { + name: "watch-resourceversionmatch-without-sendInitialEvents-forbidden", + opts: internalversion.ListOptions{ + Watch: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }, + expectErrors: []string{"resourceVersionMatch: Forbidden: resourceVersionMatch is forbidden for watch unless sendInitialEvents is provided"}, + }, + { + name: "watch-sendInitialEvents-without-resourceversionmatch-forbidden", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(true), + }, + expectErrors: []string{"resourceVersionMatch: Forbidden: sendInitialEvents requires setting resourceVersionMatch to NotOlderThan", "sendInitialEvents: Forbidden: sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled"}, + }, + { + name: "watch-sendInitialEvents-with-exact-resourceversionmatch-forbidden", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(true), + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + AllowWatchBookmarks: true, + }, + watchListFeatureEnabled: true, + expectErrors: []string{"resourceVersionMatch: Forbidden: sendInitialEvents requires setting resourceVersionMatch to NotOlderThan", "resourceVersionMatch: Unsupported value: \"Exact\": supported values: \"NotOlderThan\""}, + }, + { + name: "watch-sendInitialEvents-on-with-empty-resourceversionmatch-forbidden", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(true), + ResourceVersionMatch: "", + }, + expectErrors: []string{"resourceVersionMatch: Forbidden: sendInitialEvents requires setting resourceVersionMatch to NotOlderThan", "sendInitialEvents: Forbidden: sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled"}, + }, + { + name: "watch-sendInitialEvents-off-with-empty-resourceversionmatch-forbidden", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(false), + ResourceVersionMatch: "", + }, + expectErrors: []string{"resourceVersionMatch: Forbidden: sendInitialEvents requires setting resourceVersionMatch to NotOlderThan", "sendInitialEvents: Forbidden: sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled"}, + }, + { + name: "watch-sendInitialEvents-with-incorrect-resourceversionmatch-forbidden", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(true), + ResourceVersionMatch: "incorrect", + AllowWatchBookmarks: true, + }, + watchListFeatureEnabled: true, + expectErrors: []string{"resourceVersionMatch: Forbidden: sendInitialEvents requires setting resourceVersionMatch to NotOlderThan", "resourceVersionMatch: Unsupported value: \"incorrect\": supported values: \"NotOlderThan\""}, + }, + { + // note that validating allowWatchBookmarks would break backward compatibility + // because it was possible to request initial events via resourceVersion=0 before this change + name: "watch-sendInitialEvents-no-allowWatchBookmark", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(true), + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + }, + watchListFeatureEnabled: true, + }, + { + name: "watch-sendInitialEvents-no-watchlist-fg-disabled", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(true), + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + AllowWatchBookmarks: true, + }, + expectErrors: []string{"sendInitialEvents: Forbidden: sendInitialEvents is forbidden for watch unless the WatchList feature gate is enabled"}, + }, + { + name: "watch-sendInitialEvents-no-watchlist-fg-disabled", + opts: internalversion.ListOptions{ + Watch: true, + SendInitialEvents: boolPtrFn(true), + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + AllowWatchBookmarks: true, + Continue: "123", + }, + watchListFeatureEnabled: true, + expectErrors: []string{"resourceVersionMatch: Forbidden: resourceVersionMatch is forbidden when continue is provided"}, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - errs := ValidateListOptions(&tc.opts) - if tc.expectError != "" { - if len(errs) != 1 { - t.Errorf("expected an error but got %d errors", len(errs)) - } else if errs[0].Error() != tc.expectError { - t.Errorf("expected error '%s' but got '%s'", tc.expectError, errs[0].Error()) + errs := ValidateListOptions(&tc.opts, tc.watchListFeatureEnabled) + if len(tc.expectErrors) > 0 { + if len(errs) != len(tc.expectErrors) { + t.Errorf("expected %d errors but got %d errors", len(tc.expectErrors), len(errs)) + return + } + for i, expectedErr := range tc.expectErrors { + if expectedErr != errs[i].Error() { + t.Errorf("expected error '%s' but got '%s'", expectedErr, errs[i].Error()) + } } return } diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go index 2a6fd74ca00..8fd047b0b94 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go @@ -400,6 +400,32 @@ type ListOptions struct { // This field is not supported when watch is true. Clients may start a watch from the last // resourceVersion value returned by the server and not miss any modifications. Continue string `json:"continue,omitempty" protobuf:"bytes,8,opt,name=continue"` + + // `sendInitialEvents=true` may be set together with `watch=true`. + // In that case, the watch stream will begin with synthetic events to + // produce the current state of objects in the collection. Once all such + // events have been sent, a synthetic "Bookmark" event will be sent. + // The bookmark will report the ResourceVersion (RV) corresponding to the + // set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation. + // Afterwards, the watch stream will proceed as usual, sending watch events + // corresponding to changes (subsequent to the RV) to objects watched. + // + // When `sendInitialEvents` option is set, we require `resourceVersionMatch` + // option to also be set. The semantic of the watch request is as following: + // - `resourceVersionMatch` = NotOlderThan + // is interpreted as "data at least as new as the provided `resourceVersion`" + // and the bookmark event is send when the state is synced + // to a `resourceVersion` at least as fresh as the one provided by the ListOptions. + // If `resourceVersion` is unset, this is interpreted as "consistent read" and the + // bookmark event is send when the state is synced at least to the moment + // when request started being processed. + // - `resourceVersionMatch` set to any other value or unset + // Invalid error is returned. + // + // Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward + // compatibility reasons) and to false otherwise. + // +optional + SendInitialEvents *bool `json:"sendInitialEvents,omitempty" protobuf:"varint,11,opt,name=sendInitialEvents"` } // resourceVersionMatch specifies how the resourceVersion parameter is applied. resourceVersionMatch