diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go index 3cc4a728a57..5ac2ff644c0 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go @@ -45,6 +45,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/tracing" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) // getterFunc performs a get request with the given context and object name. The request @@ -185,15 +186,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc if err != nil { hasName = false } - ctx = request.WithNamespace(ctx, namespace) - outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope) - if err != nil { - scope.err(err, w, req) - return - } - opts := metainternalversion.ListOptions{} if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil { err = errors.NewBadRequest(err.Error()) @@ -208,6 +202,17 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc return } + var restrictions negotiation.EndpointRestrictions + restrictions = scope + if isListWatchRequest(opts) { + restrictions = &watchListEndpointRestrictions{scope} + } + outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, restrictions) + if err != nil { + scope.err(err, w, req) + return + } + // transform fields // TODO: DecodeParametersInto should do this. if opts.FieldSelector != nil { @@ -307,3 +312,18 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result) } } + +type watchListEndpointRestrictions struct { + negotiation.EndpointRestrictions +} + +func (e *watchListEndpointRestrictions) AllowsMediaTypeTransform(mimeType, mimeSubType string, target *schema.GroupVersionKind) bool { + if target != nil && target.Kind == "Table" { + return false + } + return e.EndpointRestrictions.AllowsMediaTypeTransform(mimeType, mimeSubType, target) +} + +func isListWatchRequest(opts metainternalversion.ListOptions) bool { + return utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && ptr.Deref(opts.SendInitialEvents, false) && opts.AllowWatchBookmarks +} diff --git a/staging/src/k8s.io/client-go/util/consistencydetector/list_data_consistency_detector.go b/staging/src/k8s.io/client-go/util/consistencydetector/list_data_consistency_detector.go index 7610c05c28a..61b8fe28b9a 100644 --- a/staging/src/k8s.io/client-go/util/consistencydetector/list_data_consistency_detector.go +++ b/staging/src/k8s.io/client-go/util/consistencydetector/list_data_consistency_detector.go @@ -32,6 +32,12 @@ func init() { dataConsistencyDetectionForListFromCacheEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR")) } +// IsDataConsistencyDetectionForListEnabled returns true when +// the KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR environment variable was set during a binary startup. +func IsDataConsistencyDetectionForListEnabled() bool { + return dataConsistencyDetectionForListFromCacheEnabled +} + // CheckListFromCacheDataConsistencyIfRequested performs a data consistency check only when // the KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR environment variable was set during a binary startup // for requests that have a high chance of being served from the watch-cache. @@ -50,7 +56,7 @@ func init() { // the cache (even though this might not be true for some requests) // and issue the second call to get data from etcd for comparison. func CheckListFromCacheDataConsistencyIfRequested[T runtime.Object](ctx context.Context, identity string, listItemsFn ListFunc[T], optionsUsedToReceiveList metav1.ListOptions, receivedList runtime.Object) { - if !dataConsistencyDetectionForListFromCacheEnabled { + if !IsDataConsistencyDetectionForListEnabled() { return } checkListFromCacheDataConsistencyIfRequestedInternal(ctx, identity, listItemsFn, optionsUsedToReceiveList, receivedList) diff --git a/test/e2e/apimachinery/watchlist.go b/test/e2e/apimachinery/watchlist.go index 46541757fd5..5a5f5abe08b 100644 --- a/test/e2e/apimachinery/watchlist.go +++ b/test/e2e/apimachinery/watchlist.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "sort" + "strings" "time" "github.com/google/go-cmp/cmp" @@ -117,14 +118,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true) ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name)) - var expectedSecrets []unstructured.Unstructured - for i := 1; i <= 5; i++ { - unstructuredSecret, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newSecret(fmt.Sprintf("secret-%d", i))) - framework.ExpectNoError(err) - secret, err := f.DynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Create(ctx, &unstructured.Unstructured{Object: unstructuredSecret}, metav1.CreateOptions{}) - framework.ExpectNoError(err) - expectedSecrets = append(expectedSecrets, *secret) - } + expectedSecrets := addWellKnownUnstructuredSecrets(ctx, f) rt, clientConfig := clientConfigWithRoundTripper(f) wrappedDynamicClient, err := dynamic.NewForConfig(clientConfig) @@ -171,16 +165,86 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe expectedRequestMadeByMetaClient := getExpectedRequestMadeByClientFor(secretMetaList.GetResourceVersion()) gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByMetaClient)) }) + + // Validates unsupported Accept headers in WatchList. + // Sets AcceptContentType to "application/json;as=Table", which the API doesn't support, returning a 406 error. + // After the 406, the client falls back to a regular list request. + ginkgo.It("doesn't support receiving resources as Tables", func(ctx context.Context) { + featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true) + + ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name)) + _ = addWellKnownUnstructuredSecrets(ctx, f) + + rt, clientConfig := clientConfigWithRoundTripper(f) + modifiedClientConfig := dynamic.ConfigFor(clientConfig) + modifiedClientConfig.AcceptContentTypes = strings.Join([]string{ + fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName), + }, ",") + modifiedClientConfig.GroupVersion = &v1.SchemeGroupVersion + restClient, err := rest.RESTClientFor(modifiedClientConfig) + framework.ExpectNoError(err) + wrappedDynamicClient := dynamic.New(restClient) + + // note that the client in case of an error (406) will fall back + // to a standard list request thus the overall call passes + ginkgo.By("Streaming secrets as Table from the server") + secretTable, err := wrappedDynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + gomega.Expect(secretTable.GetObjectKind().GroupVersionKind()).To(gomega.Equal(metav1.SchemeGroupVersion.WithKind("Table"))) + + ginkgo.By("Verifying if expected response was sent by the server") + gomega.Expect(rt.actualResponseStatuses[0]).To(gomega.Equal("406 Not Acceptable")) + expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientWhenFallbackToListFor(secretTable.GetResourceVersion()) + gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient)) + + }) + + // Sets AcceptContentType to both "application/json;as=Table" and "application/json". + // Unlike the previous test, no 406 error occurs, as the API falls back to "application/json" and returns a valid response. + ginkgo.It("falls backs to supported content type when when receiving resources as Tables was requested", func(ctx context.Context) { + featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true) + + ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name)) + expectedSecrets := addWellKnownUnstructuredSecrets(ctx, f) + + rt, clientConfig := clientConfigWithRoundTripper(f) + modifiedClientConfig := dynamic.ConfigFor(clientConfig) + modifiedClientConfig.AcceptContentTypes = strings.Join([]string{ + fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName), + "application/json", + }, ",") + modifiedClientConfig.GroupVersion = &v1.SchemeGroupVersion + restClient, err := rest.RESTClientFor(modifiedClientConfig) + framework.ExpectNoError(err) + wrappedDynamicClient := dynamic.New(restClient) + + ginkgo.By("Streaming secrets from the server") + secretList, err := wrappedDynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Verifying if the secret list was properly streamed") + streamedSecrets := secretList.Items + gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data")) + + ginkgo.By("Verifying if expected requests were sent to the server") + expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion()) + gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient)) + }) }) type roundTripper struct { - actualRequests []string - delegate http.RoundTripper + actualRequests []string + actualResponseStatuses []string + delegate http.RoundTripper } func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { r.actualRequests = append(r.actualRequests, req.URL.RawQuery) - return r.delegate.RoundTrip(req) + rsp, err := r.delegate.RoundTrip(req) + if rsp != nil { + r.actualResponseStatuses = append(r.actualResponseStatuses, rsp.Status) + } + return rsp, err } func (r *roundTripper) Wrap(delegate http.RoundTripper) http.RoundTripper { @@ -211,10 +275,12 @@ func verifyStore(ctx context.Context, expectedSecrets []v1.Secret, store cache.S framework.ExpectNoError(err) } +// corresponds to a streaming request made by the client to stream the secrets +const expectedStreamingRequestMadeByClient string = "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true" + func getExpectedRequestMadeByClientFor(rv string) []string { expectedRequestMadeByClient := []string{ - // corresponds to a streaming request made by the client to stream the secrets - "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true", + expectedStreamingRequestMadeByClient, } if consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() { // corresponds to a standard list request made by the consistency detector build in into the client @@ -223,6 +289,19 @@ func getExpectedRequestMadeByClientFor(rv string) []string { return expectedRequestMadeByClient } +func getExpectedRequestMadeByClientWhenFallbackToListFor(rv string) []string { + expectedRequestMadeByClient := []string{ + expectedStreamingRequestMadeByClient, + // corresponds to a list request made by the client + "", + } + if consistencydetector.IsDataConsistencyDetectionForListEnabled() { + // corresponds to a standard list request made by the consistency detector build in into the client + expectedRequestMadeByClient = append(expectedRequestMadeByClient, fmt.Sprintf("resourceVersion=%s&resourceVersionMatch=Exact", rv)) + } + return expectedRequestMadeByClient +} + func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secret { ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name)) var secrets []v1.Secret @@ -234,6 +313,20 @@ func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secre return secrets } +// addWellKnownUnstructuredSecrets exists because secrets from addWellKnownSecrets +// don't have type info and cannot be converted. +func addWellKnownUnstructuredSecrets(ctx context.Context, f *framework.Framework) []unstructured.Unstructured { + var secrets []unstructured.Unstructured + for i := 1; i <= 5; i++ { + unstructuredSecret, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newSecret(fmt.Sprintf("secret-%d", i))) + framework.ExpectNoError(err) + secret, err := f.DynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Create(ctx, &unstructured.Unstructured{Object: unstructuredSecret}, metav1.CreateOptions{}) + framework.ExpectNoError(err) + secrets = append(secrets, *secret) + } + return secrets +} + type byName []v1.Secret func (a byName) Len() int { return len(a) }